feat(managed-job): add restore from failed state
This commit is contained in:
parent
64e84c8d0f
commit
dd700b6007
2
go.mod
2
go.mod
@ -5,7 +5,7 @@ go 1.23.2
|
|||||||
require (
|
require (
|
||||||
gitea.com/logicamp/lc v1.14.6
|
gitea.com/logicamp/lc v1.14.6
|
||||||
github.com/dgraph-io/badger/v4 v4.5.0
|
github.com/dgraph-io/badger/v4 v4.5.0
|
||||||
github.com/logi-camp/go-flink-client v0.1.1
|
github.com/logi-camp/go-flink-client v0.1.3
|
||||||
github.com/matoous/go-nanoid/v2 v2.1.0
|
github.com/matoous/go-nanoid/v2 v2.1.0
|
||||||
github.com/tidwall/buntdb v1.3.2
|
github.com/tidwall/buntdb v1.3.2
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@ -264,6 +264,10 @@ github.com/logi-camp/go-flink-client v0.1.0 h1:uzBV6RGkyzZVdSQ8zAlUGbJ5hdXRspaPj
|
|||||||
github.com/logi-camp/go-flink-client v0.1.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
github.com/logi-camp/go-flink-client v0.1.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||||
github.com/logi-camp/go-flink-client v0.1.1 h1:N/XLXE/WZGPKgJo8pN+Q3JrHBOoZHwUoditd399coRo=
|
github.com/logi-camp/go-flink-client v0.1.1 h1:N/XLXE/WZGPKgJo8pN+Q3JrHBOoZHwUoditd399coRo=
|
||||||
github.com/logi-camp/go-flink-client v0.1.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
github.com/logi-camp/go-flink-client v0.1.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||||
|
github.com/logi-camp/go-flink-client v0.1.2 h1:LH9SD7jbVi6CJ9ZTyIAmlLGqLV0KZEIQulZSZiVuMr8=
|
||||||
|
github.com/logi-camp/go-flink-client v0.1.2/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||||
|
github.com/logi-camp/go-flink-client v0.1.3 h1:YaVH0yJUIcZn8KPodNLXKuo2T394ph9XUEGUhU8+sDQ=
|
||||||
|
github.com/logi-camp/go-flink-client v0.1.3/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
||||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
||||||
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
|
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
|
||||||
|
|||||||
@ -45,7 +45,7 @@ func (job *ManagedJob) cycle() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Check for set running or error state
|
// Check for set running or error state
|
||||||
if job.state.Status == JobStatusCreating {
|
if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing {
|
||||||
err := job.checkStatus()
|
err := job.checkStatus()
|
||||||
if errors.Is(err, ErrNoJobId) {
|
if errors.Is(err, ErrNoJobId) {
|
||||||
job.state = nil
|
job.state = nil
|
||||||
@ -67,5 +67,10 @@ func (job *ManagedJob) cycle() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if job.state.Status == JobStatusFailed {
|
||||||
|
job.restore()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status)))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
34
internal/managed_job/restore.go
Normal file
34
internal/managed_job/restore.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package managed_job
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gitea.com/logicamp/lc"
|
||||||
|
api "github.com/logi-camp/go-flink-client"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// restore the job from savepoint and jarId in managedJob
|
||||||
|
func (job *ManagedJob) restore() error {
|
||||||
|
if job.state.LastSavepointPath == nil {
|
||||||
|
lc.Logger.Error("[managed-job] [restore]", zap.Error(ErrNoSavepointPath))
|
||||||
|
return ErrNoSavepointPath
|
||||||
|
}
|
||||||
|
lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.state.LastSavepointPath))
|
||||||
|
runJarResp, err := job.client.RunJar(api.RunOpts{
|
||||||
|
JarID: job.jarId,
|
||||||
|
AllowNonRestoredState: true,
|
||||||
|
EntryClass: job.def.Spec.EntryClass,
|
||||||
|
SavepointPath: *job.state.LastSavepointPath,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp))
|
||||||
|
|
||||||
|
job.state.JobId = &runJarResp.JobId
|
||||||
|
job.state.Status = JobStatusCreating
|
||||||
|
job.state.Error = nil
|
||||||
|
job.updateState(*job.state)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
@ -33,7 +33,7 @@ func (job ManagedJob) trackSavepoint() error {
|
|||||||
return ErrNoSavepointTriggerId
|
return ErrNoSavepointTriggerId
|
||||||
}
|
}
|
||||||
resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId)
|
resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId)
|
||||||
lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("resp", resp), zap.Error(err))
|
lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Error(err))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
||||||
job.removeSavepointTriggerId()
|
job.removeSavepointTriggerId()
|
||||||
|
|||||||
@ -45,7 +45,7 @@ func (job *ManagedJob) setError(errMsg string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (job *ManagedJob) setSavepointLocation(savepointId string) {
|
func (job *ManagedJob) setSavepointLocation(savepointId string) {
|
||||||
job.state.LastSavepointLocation = &savepointId
|
job.state.LastSavepointPath = &savepointId
|
||||||
job.state.SavepointTriggerId = nil
|
job.state.SavepointTriggerId = nil
|
||||||
n := time.Now()
|
n := time.Now()
|
||||||
job.state.LastSavepointDate = &n
|
job.state.LastSavepointDate = &n
|
||||||
|
|||||||
@ -10,22 +10,31 @@ type JobStatus string
|
|||||||
var (
|
var (
|
||||||
ErrNoJobId = errors.New("[managed-job] no job id")
|
ErrNoJobId = errors.New("[managed-job] no job id")
|
||||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||||
|
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
JobStatusRunning JobStatus = "RUNNING"
|
JobStatusInitializing JobStatus = "INITIALIZING"
|
||||||
JobStatusCreating JobStatus = "CREATING"
|
JobStatusRunning JobStatus = "RUNNING"
|
||||||
JobStatusNotFound JobStatus = "NotFound"
|
JobStatusCreating JobStatus = "CREATING"
|
||||||
JobStatusError JobStatus = "ERROR"
|
JobStatusNotFound JobStatus = "NotFound"
|
||||||
JobStatusReconciling JobStatus = "RECONCILING"
|
JobStatusError JobStatus = "ERROR"
|
||||||
|
JobStatusReconciling JobStatus = "RECONCILING"
|
||||||
|
JobStatusFailed JobStatus = "FAILED"
|
||||||
|
JobStatusFailing JobStatus = "FAILING"
|
||||||
|
JobStatusRestarting JobStatus = "RESTARTING"
|
||||||
|
JobStatusFinished JobStatus = "FINISHED"
|
||||||
|
JobStatusCanceled JobStatus = "CANCELED"
|
||||||
|
JobStatusCancelling JobStatus = "CANCELLING"
|
||||||
|
JobStatusSuspended JobStatus = "SUSPENDED"
|
||||||
)
|
)
|
||||||
|
|
||||||
type jobState struct {
|
type jobState struct {
|
||||||
Status JobStatus `json:"status"`
|
Status JobStatus `json:"status"`
|
||||||
Error *string `json:"error"`
|
Error *string `json:"error"`
|
||||||
Info *string `json:"info"`
|
Info *string `json:"info"`
|
||||||
JobId *string `json:"job_id"`
|
JobId *string `json:"job_id"`
|
||||||
LastSavepointLocation *string `json:"last_savepoint_location"`
|
LastSavepointPath *string `json:"last_savepoint_location"`
|
||||||
SavepointTriggerId *string `json:"savepoint_trigger_id"`
|
SavepointTriggerId *string `json:"savepoint_trigger_id"`
|
||||||
LastSavepointDate *time.Time `json:"last_savepoint_time"`
|
LastSavepointDate *time.Time `json:"last_savepoint_time"`
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user