package managed_job import ( "strings" "gitea.com/logicamp/lc" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) func (job ManagedJob) createSavepoint() error { if job.state.JobId == nil { lc.Logger.Debug("[managed-job] [savepoint] no job id") return ErrNoJobId } lc.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String())) resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false) if err != nil { lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err)) return err } lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp)) job.setSavepointTriggerId(resp.RequestID) return nil } func (job ManagedJob) trackSavepoint() error { if job.state.JobId == nil { lc.Logger.Debug("[managed-job] [savepoint] no job id") return ErrNoJobId } if job.state.SavepointTriggerId == nil { lc.Logger.Debug("[managed-job] [savepoint] no job id") return ErrNoSavepointTriggerId } resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId) lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Error(err)) if err != nil { if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { job.removeSavepointTriggerId() } } if resp.Status.Id == api.SavepointStatusInCompleted { job.setSavepointLocation(resp.Operation.Location) } return nil }