package managed_job import ( "errors" "flink-kube-operator/internal/crd/v1alpha1" "strings" "time" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) // restore the job from savepoint and jarId in managedJob func (job *ManagedJob) restore() error { var savepointPath string if job.def.Status.LastSavepointPath == nil { pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) //return v1alpha1.ErrNoSavepointPath } else { savepointPath = *job.def.Status.LastSavepointPath } if job.def.Status.JarId == nil { err := errors.New("missing jar id") pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) return err } pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath)) var jobId *string for { runJarResp, err := job.client.RunJar(api.RunOpts{ JarID: *job.def.Status.JarId, AllowNonRestoredState: true, EntryClass: job.def.Spec.EntryClass, SavepointPath: savepointPath, }) if err != nil { if strings.ContainsAny(err.Error(), ".jar does not exist") { err := job.upload() if err != nil { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": "[upload-error] " + err.Error(), }, }) return nil } continue } pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) return err } jobId = &runJarResp.JobId pkg.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) break } // job.def.Status.JobId = &runJarResp.JobId // job.def.Status.JobStatus = v1alpha1.JobStatusCreating // job.def.Status.Error = nil job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "jobId": jobId, "jobStatus": v1alpha1.JobStatusCreating, "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, "restoredCount": job.def.Status.RestoredCount + 1, "lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339), "error": nil, }, }) return nil }