package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" "strings" "time" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) // Run the job from savepoint and jarId in managedJob func (job *ManagedJob) Run(restoreMode bool) error { var savepointPath string if job.def.Status.LastSavepointPath == nil { pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) if restoreMode { return v1alpha1.ErrNoSavepointPath } } else { savepointPath = *job.def.Status.LastSavepointPath } pkg.Logger.Info( "[managed-job] [restore] starting job...", zap.Bool("restoreMode", restoreMode), zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath), ) var jobId *string for { shouldUpload := false if job.def.Status.JarId == nil { err := v1alpha1.ErrNoJarId pkg.Logger.Warn("[managed-job] [run] will upload new jar...", zap.Error(err)) shouldUpload = true } else { runJarResp, err := job.client.RunJar(api.RunOpts{ JarID: *job.def.Status.JarId, AllowNonRestoredState: true, EntryClass: job.def.Spec.EntryClass, SavepointPath: savepointPath, Parallelism: job.def.Spec.Parallelism, ProgramArg: job.def.Spec.Args, }) if err == nil { pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp)) jobId = &runJarResp.JobId break } else { if strings.Contains(err.Error(), ".jar does not exist") { pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) shouldUpload = true } else { pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) stringErr := err.Error() job.def.Status.Error = &stringErr job.def.Status.JobStatus = "" job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed job.crd.SetJobStatus(job.def.UID, job.def.Status) return v1alpha1.ErrOnStartingJob } } } if shouldUpload { err := job.upload() if err != nil { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": "[upload-error] " + err.Error(), }, }) return nil } shouldUpload = false continue } } // job.def.Status.JobId = &runJarResp.JobId // job.def.Status.JobStatus = v1alpha1.JobStatusCreating // job.def.Status.Error = nil job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "jobId": jobId, "runningJarURI": job.def.Spec.JarURI, "jobStatus": v1alpha1.JobStatusCreating, "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, "restoredCount": job.def.Status.RestoredCount + 1, "lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339), "error": nil, }, }) return nil }