From dd700b6007059b63642b5cf9bd9014be431c3bdd Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sun, 1 Dec 2024 00:56:16 +0330 Subject: [PATCH] feat(managed-job): add restore from failed state --- go.mod | 2 +- go.sum | 4 ++++ internal/managed_job/cycle.go | 7 ++++++- internal/managed_job/restore.go | 34 +++++++++++++++++++++++++++++++ internal/managed_job/savepoint.go | 2 +- internal/managed_job/state.go | 2 +- internal/managed_job/type.go | 33 +++++++++++++++++++----------- 7 files changed, 68 insertions(+), 16 deletions(-) create mode 100644 internal/managed_job/restore.go diff --git a/go.mod b/go.mod index c9d5abd..915e7ee 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.2 require ( gitea.com/logicamp/lc v1.14.6 github.com/dgraph-io/badger/v4 v4.5.0 - github.com/logi-camp/go-flink-client v0.1.1 + github.com/logi-camp/go-flink-client v0.1.3 github.com/matoous/go-nanoid/v2 v2.1.0 github.com/tidwall/buntdb v1.3.2 go.uber.org/zap v1.27.0 diff --git a/go.sum b/go.sum index a4a1cae..c0ba322 100644 --- a/go.sum +++ b/go.sum @@ -264,6 +264,10 @@ github.com/logi-camp/go-flink-client v0.1.0 h1:uzBV6RGkyzZVdSQ8zAlUGbJ5hdXRspaPj github.com/logi-camp/go-flink-client v0.1.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/logi-camp/go-flink-client v0.1.1 h1:N/XLXE/WZGPKgJo8pN+Q3JrHBOoZHwUoditd399coRo= github.com/logi-camp/go-flink-client v0.1.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/logi-camp/go-flink-client v0.1.2 h1:LH9SD7jbVi6CJ9ZTyIAmlLGqLV0KZEIQulZSZiVuMr8= +github.com/logi-camp/go-flink-client v0.1.2/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/logi-camp/go-flink-client v0.1.3 h1:YaVH0yJUIcZn8KPodNLXKuo2T394ph9XUEGUhU8+sDQ= +github.com/logi-camp/go-flink-client v0.1.3/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index c7b7312..67bccec 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -45,7 +45,7 @@ func (job *ManagedJob) cycle() { return } // Check for set running or error state - if job.state.Status == JobStatusCreating { + if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing { err := job.checkStatus() if errors.Is(err, ErrNoJobId) { job.state = nil @@ -67,5 +67,10 @@ func (job *ManagedJob) cycle() { } return } + if job.state.Status == JobStatusFailed { + job.restore() + return + } + lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status))) } diff --git a/internal/managed_job/restore.go b/internal/managed_job/restore.go new file mode 100644 index 0000000..e917c5d --- /dev/null +++ b/internal/managed_job/restore.go @@ -0,0 +1,34 @@ +package managed_job + +import ( + "gitea.com/logicamp/lc" + api "github.com/logi-camp/go-flink-client" + "go.uber.org/zap" +) + +// restore the job from savepoint and jarId in managedJob +func (job *ManagedJob) restore() error { + if job.state.LastSavepointPath == nil { + lc.Logger.Error("[managed-job] [restore]", zap.Error(ErrNoSavepointPath)) + return ErrNoSavepointPath + } + lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.state.LastSavepointPath)) + runJarResp, err := job.client.RunJar(api.RunOpts{ + JarID: job.jarId, + AllowNonRestoredState: true, + EntryClass: job.def.Spec.EntryClass, + SavepointPath: *job.state.LastSavepointPath, + }) + if err != nil { + lc.Logger.Error("[managed-job] [run]", zap.Error(err)) + return err + } + lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) + + job.state.JobId = &runJarResp.JobId + job.state.Status = JobStatusCreating + job.state.Error = nil + job.updateState(*job.state) + + return err +} diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index ba9c799..89f5ac1 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -33,7 +33,7 @@ func (job ManagedJob) trackSavepoint() error { return ErrNoSavepointTriggerId } resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId) - lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("resp", resp), zap.Error(err)) + lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Error(err)) if err != nil { if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { job.removeSavepointTriggerId() diff --git a/internal/managed_job/state.go b/internal/managed_job/state.go index 0334387..44e02e9 100644 --- a/internal/managed_job/state.go +++ b/internal/managed_job/state.go @@ -45,7 +45,7 @@ func (job *ManagedJob) setError(errMsg string) { } func (job *ManagedJob) setSavepointLocation(savepointId string) { - job.state.LastSavepointLocation = &savepointId + job.state.LastSavepointPath = &savepointId job.state.SavepointTriggerId = nil n := time.Now() job.state.LastSavepointDate = &n diff --git a/internal/managed_job/type.go b/internal/managed_job/type.go index fa9c0b9..3bca15d 100644 --- a/internal/managed_job/type.go +++ b/internal/managed_job/type.go @@ -10,22 +10,31 @@ type JobStatus string var ( ErrNoJobId = errors.New("[managed-job] no job id") ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id") + ErrNoSavepointPath = errors.New("[managed-job] no savepoint path") ) const ( - JobStatusRunning JobStatus = "RUNNING" - JobStatusCreating JobStatus = "CREATING" - JobStatusNotFound JobStatus = "NotFound" - JobStatusError JobStatus = "ERROR" - JobStatusReconciling JobStatus = "RECONCILING" + JobStatusInitializing JobStatus = "INITIALIZING" + JobStatusRunning JobStatus = "RUNNING" + JobStatusCreating JobStatus = "CREATING" + JobStatusNotFound JobStatus = "NotFound" + JobStatusError JobStatus = "ERROR" + JobStatusReconciling JobStatus = "RECONCILING" + JobStatusFailed JobStatus = "FAILED" + JobStatusFailing JobStatus = "FAILING" + JobStatusRestarting JobStatus = "RESTARTING" + JobStatusFinished JobStatus = "FINISHED" + JobStatusCanceled JobStatus = "CANCELED" + JobStatusCancelling JobStatus = "CANCELLING" + JobStatusSuspended JobStatus = "SUSPENDED" ) type jobState struct { - Status JobStatus `json:"status"` - Error *string `json:"error"` - Info *string `json:"info"` - JobId *string `json:"job_id"` - LastSavepointLocation *string `json:"last_savepoint_location"` - SavepointTriggerId *string `json:"savepoint_trigger_id"` - LastSavepointDate *time.Time `json:"last_savepoint_time"` + Status JobStatus `json:"status"` + Error *string `json:"error"` + Info *string `json:"info"` + JobId *string `json:"job_id"` + LastSavepointPath *string `json:"last_savepoint_location"` + SavepointTriggerId *string `json:"savepoint_trigger_id"` + LastSavepointDate *time.Time `json:"last_savepoint_time"` }