From 89702d287af71b19116b0d5b82685cb9a2462027 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Fri, 13 Dec 2024 14:57:55 +0330 Subject: [PATCH] fix(savepoint): missing savepoint path env and not handled savepoint errors --- internal/managed_job/restore.go | 75 -------------------- internal/managed_job/run.go | 111 ++++++++++++++---------------- internal/managed_job/savepoint.go | 41 +++++++---- 3 files changed, 81 insertions(+), 146 deletions(-) delete mode 100644 internal/managed_job/restore.go diff --git a/internal/managed_job/restore.go b/internal/managed_job/restore.go deleted file mode 100644 index 5983696..0000000 --- a/internal/managed_job/restore.go +++ /dev/null @@ -1,75 +0,0 @@ -package managed_job - -import ( - "errors" - "flink-kube-operator/internal/crd/v1alpha1" - "strings" - "time" - - "flink-kube-operator/pkg" - - api "github.com/logi-camp/go-flink-client" - "go.uber.org/zap" -) - -// restore the job from savepoint and jarId in managedJob -func (job *ManagedJob) restore() error { - var savepointPath string - if job.def.Status.LastSavepointPath == nil { - pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) - //return v1alpha1.ErrNoSavepointPath - } else { - savepointPath = *job.def.Status.LastSavepointPath - } - if job.def.Status.JarId == nil { - err := errors.New("missing jar id") - pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) - return err - } - pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath)) - var jobId *string - for { - runJarResp, err := job.client.RunJar(api.RunOpts{ - JarID: *job.def.Status.JarId, - AllowNonRestoredState: true, - EntryClass: job.def.Spec.EntryClass, - SavepointPath: savepointPath, - }) - if err != nil { - if strings.ContainsAny(err.Error(), ".jar does not exist") { - err := job.upload() - if err != nil { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[upload-error] " + err.Error(), - }, - }) - return nil - } - continue - } - pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) - return err - } - jobId = &runJarResp.JobId - pkg.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) - break - } - - // job.def.Status.JobId = &runJarResp.JobId - // job.def.Status.JobStatus = v1alpha1.JobStatusCreating - // job.def.Status.Error = nil - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "jobId": jobId, - "jobStatus": v1alpha1.JobStatusCreating, - "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, - "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, - "restoredCount": job.def.Status.RestoredCount + 1, - "lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339), - "error": nil, - }, - }) - - return nil -} diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index 0878f09..5983696 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -3,7 +3,8 @@ package managed_job import ( "errors" "flink-kube-operator/internal/crd/v1alpha1" - "flink-kube-operator/internal/jar" + "strings" + "time" "flink-kube-operator/pkg" @@ -11,70 +12,64 @@ import ( "go.uber.org/zap" ) -// upload jar file and set the jarId for later usages -func (job *ManagedJob) upload() error { - jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) - if err != nil { - pkg.Logger.Debug("[main] error on download jar", zap.Error(err)) +// restore the job from savepoint and jarId in managedJob +func (job *ManagedJob) restore() error { + var savepointPath string + if job.def.Status.LastSavepointPath == nil { + pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) + //return v1alpha1.ErrNoSavepointPath + } else { + savepointPath = *job.def.Status.LastSavepointPath + } + if job.def.Status.JarId == nil { + err := errors.New("missing jar id") + pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) return err } - fileName, err := jarFile.Upload(job.client) - if err != nil { - pkg.Logger.Debug("[main] error on upload jar", zap.Error(err)) - return err + pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath)) + var jobId *string + for { + runJarResp, err := job.client.RunJar(api.RunOpts{ + JarID: *job.def.Status.JarId, + AllowNonRestoredState: true, + EntryClass: job.def.Spec.EntryClass, + SavepointPath: savepointPath, + }) + if err != nil { + if strings.ContainsAny(err.Error(), ".jar does not exist") { + err := job.upload() + if err != nil { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "error": "[upload-error] " + err.Error(), + }, + }) + return nil + } + continue + } + pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) + return err + } + jobId = &runJarResp.JobId + pkg.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) + break } - err = jarFile.Delete() - if err != nil { - pkg.Logger.Debug("[main] error on delete jar", zap.Error(err)) - } - pkg.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", fileName)) - job.def.Status.JarId = &fileName + // job.def.Status.JobId = &runJarResp.JobId + // job.def.Status.JobStatus = v1alpha1.JobStatusCreating + // job.def.Status.Error = nil job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ - "jarId": job.def.Status.JarId, + "jobId": jobId, + "jobStatus": v1alpha1.JobStatusCreating, + "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, + "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, + "restoredCount": job.def.Status.RestoredCount + 1, + "lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339), + "error": nil, }, }) + return nil } - -// run the job from saved jarId in managedJob -func (job *ManagedJob) run() error { - if job.def.Status.JarId == nil { - err := errors.New("missing jar id") - pkg.Logger.Error("[managed-job] [run]", zap.Error(err)) - return err - } - pkg.Logger.Info("[managed-job] [run] starting job", zap.String("name", job.def.GetName())) - runJarResp, err := job.client.RunJar(api.RunOpts{ - JarID: *job.def.Status.JarId, - AllowNonRestoredState: true, - EntryClass: job.def.Spec.EntryClass, - }) - if err != nil { - pkg.Logger.Error("[managed-job] [run]", zap.Error(err)) - return err - } - pkg.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) - - // if job.state == nil { - // job.state = &jobState{} - // } - // job.state.JobId = &runJarResp.JobId - // job.state.Status = v1alpha1.JobStatusCreating - // job.updateState(*job.state) - // job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{ - // JobId: job.state.JobId, - // }) - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "jobId": &runJarResp.JobId, - "jobStatus": v1alpha1.JobStatusCreating, - "lifeCycleStatus": v1alpha1.LifeCycleStatusInitializing, - "error": nil, - }, - }) - //job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating}) - - return err -} diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index ee177c4..377666d 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -2,6 +2,7 @@ package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" + "os" "strings" "time" @@ -17,7 +18,7 @@ func (job ManagedJob) createSavepoint() error { return v1alpha1.ErrNoJobId } pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String())) - resp, err := job.client.SavePoints(*job.def.Status.JobId, "/flink-data/savepoints-2/", false) + resp, err := job.client.SavePoints(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false) if err != nil { pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err)) return err @@ -42,24 +43,38 @@ func (job ManagedJob) trackSavepoint() error { return v1alpha1.ErrNoSavepointTriggerId } resp, err := job.client.TrackSavepoint(*job.def.Status.JobId, *job.def.Status.SavepointTriggerId) - pkg.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Error(err)) - if err != nil { - if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { + pkg.Logger.Debug("[managed-job] [savepoint] track savepoint", + zap.Any("status.Id", resp.Status.Id), + zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), + zap.Any("failureCause.class", resp.Operation.FailureCause.Class), + zap.Error(err), + ) + if err != nil || resp.Operation.FailureCause.Class != "" { + if err != nil { + if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "savepointTriggerId": nil, + }, + }) + } + } else { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ - "savepointTriggerId": nil, + "error": resp.Operation.FailureCause.StackTrace, + }, + }) + } + } else { + if resp.Status.Id == api.SavepointStatusInCompleted { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "lastSavepointPath": resp.Operation.Location, + "lastSavepointDate": time.Now().Format(time.RFC3339), }, }) } } - if resp.Status.Id == api.SavepointStatusInCompleted { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "lastSavepointPath": resp.Operation.Location, - "lastSavepointDate": time.Now().Format(time.RFC3339), - }, - }) - } return nil }