package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/pkg" "os" "time" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) func (job *ManagedJob) pause() error { var err error if job.def.Status.JobId != nil { result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false) if stopJobErr != nil { err = stopJobErr pkg.Logger.Error("[managed-job] [pause] cannot stop job", zap.Error(err)) return err } var savepointPath string for { trackResult, err := job.client.TrackSavepoint(*job.def.Status.JobId, result.RequestID) time.Sleep(time.Millisecond * 500) if err == nil && trackResult.Status.Id == api.SavepointStatusInCompleted { if trackResult.Operation.Location != "" { savepointPath = trackResult.Operation.Location } break } } if savepointPath != "" { job.def.Status.LastSavepointPath = &savepointPath job.def.Status.PauseSavepointTriggerId = nil job.def.Status.JobStatus = "" job.def.Status.LastSavepointPath = &savepointPath lastSavepointDate := time.Now() job.def.Status.LastSavepointDate = &lastSavepointDate job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "jobStatus": "FINISHED", "lifeCycleStatus": v1alpha1.LifeCycleStatusGracefullyPaused, "savepointTriggerId": nil, "lastSavepointPath": savepointPath, "lastSavepointDate": lastSavepointDate.Format(time.RFC3339), }, }) pkg.Logger.Info( "[managed-job] job paused successfully", zap.String("jobName", job.def.GetName()), zap.String("savepointPath", savepointPath), ) } else { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "lifeCycleStatus": v1alpha1.LifeCycleStatusGracefulStopFailed, "lastSavepointPath": savepointPath, "lastSavepointDate": time.Now().Format(time.RFC3339), }, }) pkg.Logger.Error( "[managed-job] error in pausing job", zap.Error(err), ) return err } } return nil }