package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" "strings" "time" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) // run the job from savepoint and jarId in managedJob func (job *ManagedJob) run() error { var savepointPath string if job.def.Status.LastSavepointPath == nil { pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) //return v1alpha1.ErrNoSavepointPath } else { savepointPath = *job.def.Status.LastSavepointPath } pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath)) var jobId *string for { shouldUpload := false if job.def.Status.JarId == nil { err := v1alpha1.ErrNoJarId pkg.Logger.Error("[managed-job] [run]", zap.Error(err)) shouldUpload = true } else { runJarResp, err := job.client.RunJar(api.RunOpts{ JarID: *job.def.Status.JarId, AllowNonRestoredState: true, EntryClass: job.def.Spec.EntryClass, SavepointPath: savepointPath, }) if err == nil { pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp)) jobId = &runJarResp.JobId break } else { if strings.ContainsAny(err.Error(), ".jar does not exist") { shouldUpload = true } else { pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) } } } if shouldUpload { err := job.upload() if err != nil { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": "[upload-error] " + err.Error(), }, }) return nil } continue } return nil } // job.def.Status.JobId = &runJarResp.JobId // job.def.Status.JobStatus = v1alpha1.JobStatusCreating // job.def.Status.Error = nil job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "jobId": jobId, "runningJarURI": job.def.Spec.JarURI, "jobStatus": v1alpha1.JobStatusCreating, "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, "restoredCount": job.def.Status.RestoredCount + 1, "lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339), "error": nil, }, }) return nil }