package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" "strings" "time" "gitea.com/logicamp/lc" "go.uber.org/zap" ) // func (job *ManagedJob) startCycle() { // ticker := time.NewTicker(5 * time.Second) // quit := make(chan struct{}) // // load job state from db // job.loadState() // go func() { // for { // select { // case <-ticker.C: // job.cycle() // case <-quit: // ticker.Stop() // return // } // } // }() // } func (job *ManagedJob) Cycle() { lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID))) // Init job if job.def.Status.JobStatus == "" { if job.def.Status.LastSavepointPath == nil { if job.def.Status.JarId == nil { err := job.upload() if err != nil { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": "[upload-error] " + err.Error(), }, }) return } } for { err := job.run() if err != nil { if strings.ContainsAny(err.Error(), ".jar does not exist") { err := job.upload() if err != nil { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": "[upload-error] " + err.Error(), }, }) return } continue } job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": "[run-error] " + err.Error(), }, }) return } return } } else { job.restore() } return } // job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{ // JobStatus: job.def.Status.JobStatus, // }) // Check for set running or error state /* if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing { err := job.checkStatus() if errors.Is(err, ErrNoJobId) { job.state = nil } return } */ if job.def.Status.JobStatus == v1alpha1.JobStatusRunning { if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.def.Status.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.def.Status.LastSavepointDate)) { if job.def.Status.SavepointTriggerId == nil { job.createSavepoint() } else { job.trackSavepoint() } } return } if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil { job.restore() return } lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus))) }