package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" "os" "strings" "time" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) func (job ManagedJob) createSavepoint() error { if job.def.Status.JobId == nil { pkg.Logger.Debug("[managed-job] [savepoint] no job id") return v1alpha1.ErrNoJobId } pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String())) resp, err := job.client.SavePoints(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false) if err != nil { pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err)) return err } pkg.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp)) job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "savepointTriggerId": resp.RequestID, }, }) return nil } func (job ManagedJob) trackSavepoint() error { if job.def.Status.JobId == nil { pkg.Logger.Debug("[managed-job] [savepoint] no job id") return v1alpha1.ErrNoJobId } if job.def.Status.SavepointTriggerId == nil { pkg.Logger.Debug("[managed-job] [savepoint] no job id") return v1alpha1.ErrNoSavepointTriggerId } resp, err := job.client.TrackSavepoint(*job.def.Status.JobId, *job.def.Status.SavepointTriggerId) pkg.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Any("failureCause.class", resp.Operation.FailureCause.Class), zap.Error(err), ) if err != nil || resp.Operation.FailureCause.Class != "" { if err != nil { if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "savepointTriggerId": nil, }, }) } } else { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "error": resp.Operation.FailureCause.StackTrace, }, }) } } else { if resp.Status.Id == api.SavepointStatusInCompleted { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "lastSavepointPath": resp.Operation.Location, "lastSavepointDate": time.Now().Format(time.RFC3339), }, }) } } return nil }