From 8f4cb093b1ae39ee3c3b8e074a0ee7d0edf781e7 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sat, 30 Nov 2024 02:11:30 +0330 Subject: [PATCH] feat(savepoint): create and track for getting savepoint location --- go.mod | 2 +- go.sum | 2 ++ internal/managed_job/cycle.go | 14 +++++++------- internal/managed_job/savepoint.go | 27 +++++++++++++++++++++++++++ internal/managed_job/state.go | 20 ++++++++++++++++++-- internal/managed_job/status.go | 7 ++----- internal/managed_job/type.go | 16 +++++++++------- 7 files changed, 66 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index da28c6d..274e0ab 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/klauspost/compress v1.17.11 // indirect - github.com/logi-camp/go-flink-client v0.0.3 // indirect + github.com/logi-camp/go-flink-client v0.1.0 // indirect github.com/matoous/go-nanoid/v2 v2.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index 2ea9b6e..8d28637 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/logi-camp/go-flink-client v0.0.1 h1:VZI5ctoJEArwtBprIXZNk+cv5fEQZ4Ecu github.com/logi-camp/go-flink-client v0.0.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/logi-camp/go-flink-client v0.0.3 h1:NT9FYJG7jqroq44jfJfRnva3NO/jgpgURcU0mFysK1A= github.com/logi-camp/go-flink-client v0.0.3/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/logi-camp/go-flink-client v0.1.0 h1:uzBV6RGkyzZVdSQ8zAlUGbJ5hdXRspaPjzzYJUQ2aJU= +github.com/logi-camp/go-flink-client v0.1.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE= github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 513c7d3..9bdb248 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -58,14 +58,14 @@ func (job *ManagedJob) cycle() { if errors.Is(err, ErrNoJobId) { job.state = nil } + if job.state.LastSavepointDate == nil || time.Now().Add(-time.Minute*3).After(*job.state.LastSavepointDate) { + if job.state.SavepointTriggerId == nil { + job.createSavepoint() + } else { + job.trackSavepoint() + } + } return } - //if job.state.LastSavepointDate == nil || time.Now().Add(-time.Minute*3).After(*job.state.LastSavepointDate) { - err := job.createSavepoint() - if errors.Is(err, ErrNoJobId) { - job.state = nil - } - //} - } diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index 4b74183..ba9c799 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -1,7 +1,10 @@ package managed_job import ( + "strings" + "gitea.com/logicamp/lc" + api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) @@ -16,5 +19,29 @@ func (job ManagedJob) createSavepoint() error { return err } lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp)) + job.setSavepointTriggerId(resp.RequestID) + return nil +} + +func (job ManagedJob) trackSavepoint() error { + if job.state.JobId == nil { + lc.Logger.Debug("[managed-job] [savepoint] no job id") + return ErrNoJobId + } + if job.state.SavepointTriggerId == nil { + lc.Logger.Debug("[managed-job] [savepoint] no job id") + return ErrNoSavepointTriggerId + } + resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId) + lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("resp", resp), zap.Error(err)) + if err != nil { + if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { + job.removeSavepointTriggerId() + } + } + if resp.Status.Id == api.SavepointStatusInCompleted { + job.setSavepointLocation(resp.Operation.Location) + } + return nil } diff --git a/internal/managed_job/state.go b/internal/managed_job/state.go index 225c420..583eb41 100644 --- a/internal/managed_job/state.go +++ b/internal/managed_job/state.go @@ -47,9 +47,25 @@ func (job *ManagedJob) setError(errMsg string) { job.updateState(*job.state) } -func (job *ManagedJob) setSavepointId(savepointId string) { - job.state.LastSavepointId = &savepointId +func (job *ManagedJob) setSavepointLocation(savepointId string) { + job.state.LastSavepointLocation = &savepointId + job.state.SavepointTriggerId = nil n := time.Now() job.state.LastSavepointDate = &n job.updateState(*job.state) } + +func (job *ManagedJob) setSavepointTriggerId(savepointReqId string) { + job.state.SavepointTriggerId = &savepointReqId + job.updateState(*job.state) +} + +func (job *ManagedJob) removeSavepointTriggerId() { + job.state.SavepointTriggerId = nil + job.updateState(*job.state) +} + +func (job *ManagedJob) setStatus(status JobStatus) { + job.state.Status = status + job.updateState(*job.state) +} diff --git a/internal/managed_job/status.go b/internal/managed_job/status.go index 5ca6fb1..274cad6 100644 --- a/internal/managed_job/status.go +++ b/internal/managed_job/status.go @@ -23,10 +23,7 @@ func (job *ManagedJob) checkStatus() error { } return err } - lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp)) - job.updateState(jobState{ - JobId: job.state.JobId, - Status: JobStatus(statusResp.State), - }) + //lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp)) + job.setStatus(JobStatus(statusResp.State)) return err } diff --git a/internal/managed_job/type.go b/internal/managed_job/type.go index 0421b34..fa9c0b9 100644 --- a/internal/managed_job/type.go +++ b/internal/managed_job/type.go @@ -8,7 +8,8 @@ import ( type JobStatus string var ( - ErrNoJobId = errors.New("[managed-job] no job id") + ErrNoJobId = errors.New("[managed-job] no job id") + ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id") ) const ( @@ -20,10 +21,11 @@ const ( ) type jobState struct { - Status JobStatus `json:"status"` - Error *string `json:"error"` - Info *string `json:"info"` - JobId *string `json:"job_id"` - LastSavepointId *string `json:"last_savepoint_id"` - LastSavepointDate *time.Time `json:"last_savepoint_time"` + Status JobStatus `json:"status"` + Error *string `json:"error"` + Info *string `json:"info"` + JobId *string `json:"job_id"` + LastSavepointLocation *string `json:"last_savepoint_location"` + SavepointTriggerId *string `json:"savepoint_trigger_id"` + LastSavepointDate *time.Time `json:"last_savepoint_time"` }