From 91ccfebfebf3c3c7d9260ee91ac3c8de2cf25216 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sun, 8 Dec 2024 01:24:14 +0330 Subject: [PATCH] feat: retry upload on jar not found --- internal/managed_job/cycle.go | 56 +++++++++++++++++++++---------- internal/managed_job/restore.go | 58 +++++++++++++++++++++------------ internal/managed_job/run.go | 3 +- 3 files changed, 79 insertions(+), 38 deletions(-) diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 942edd5..fe757ce 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -2,6 +2,7 @@ package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" + "strings" "time" "gitea.com/logicamp/lc" @@ -32,23 +33,44 @@ func (job *ManagedJob) Cycle() { // Init job if job.def.Status.JobStatus == "" { - err := job.upload() - if err != nil { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[upload-error] " + err.Error(), - }, - }) - return - } - err = job.run() - if err != nil { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[run-error] " + err.Error(), - }, - }) - return + if job.def.Status.LastSavepointPath == nil { + if job.def.Status.JarId == nil { + err := job.upload() + if err != nil { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "error": "[upload-error] " + err.Error(), + }, + }) + return + } + } + for { + err := job.run() + if err != nil { + if strings.ContainsAny(err.Error(), ".jar does not exist") { + err := job.upload() + if err != nil { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "error": "[upload-error] " + err.Error(), + }, + }) + return + } + continue + } + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "error": "[run-error] " + err.Error(), + }, + }) + return + } + return + } + } else { + job.restore() } return } diff --git a/internal/managed_job/restore.go b/internal/managed_job/restore.go index 2b86cab..a84eddf 100644 --- a/internal/managed_job/restore.go +++ b/internal/managed_job/restore.go @@ -3,6 +3,7 @@ package managed_job import ( "errors" "flink-kube-operator/internal/crd/v1alpha1" + "strings" "time" "gitea.com/logicamp/lc" @@ -18,36 +19,53 @@ func (job *ManagedJob) restore() error { } if job.def.Status.JarId == nil { err := errors.New("missing jar id") - lc.Logger.Error("[managed-job] [run]", zap.Error(err)) + lc.Logger.Error("[managed-job] [restore]", zap.Error(err)) return err } - lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.def.Status.LastSavepointPath)) - runJarResp, err := job.client.RunJar(api.RunOpts{ - JarID: *job.def.Status.JarId, - AllowNonRestoredState: true, - EntryClass: job.def.Spec.EntryClass, - SavepointPath: *job.def.Status.LastSavepointPath, - }) - if err != nil { - lc.Logger.Error("[managed-job] [run]", zap.Error(err)) - return err + lc.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", *job.def.Status.LastSavepointPath)) + var jobId *string + for { + runJarResp, err := job.client.RunJar(api.RunOpts{ + JarID: *job.def.Status.JarId, + AllowNonRestoredState: true, + EntryClass: job.def.Spec.EntryClass, + SavepointPath: *job.def.Status.LastSavepointPath, + }) + if err != nil { + if strings.ContainsAny(err.Error(), ".jar does not exist") { + err := job.upload() + if err != nil { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "error": "[upload-error] " + err.Error(), + }, + }) + return nil + } + continue + } + lc.Logger.Error("[managed-job] [restore]", zap.Error(err)) + return err + } + jobId = &runJarResp.JobId + lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) + break } - lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) // job.def.Status.JobId = &runJarResp.JobId // job.def.Status.JobStatus = v1alpha1.JobStatusCreating // job.def.Status.Error = nil job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ - "jobId": &runJarResp.JobId, - "jobStatus": v1alpha1.JobStatusCreating, - "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, - "lastRestoredSavepointDate": job.def.Status.LastRestoredSavepointDate, - "restoredCount": job.def.Status.RestoredCount + 1, - "lastRestoredSavepointRestoreDate": time.Now().Format(time.RFC3339), - "error": nil, + "jobId": jobId, + "jobStatus": v1alpha1.JobStatusCreating, + "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, + "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, + "restoredCount": job.def.Status.RestoredCount + 1, + "lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339), + "error": nil, }, }) - return err + return nil } diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index c5c2c50..3236abd 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -37,10 +37,11 @@ func (job *ManagedJob) upload() error { // run the job from saved jarId in managedJob func (job *ManagedJob) run() error { if job.def.Status.JarId == nil { - err := errors.New("missing jar id") + err := errors.New("missing jar id") lc.Logger.Error("[managed-job] [run]", zap.Error(err)) return err } + lc.Logger.Info("[managed-job] [run] starting job", zap.String("name", job.def.GetName())) runJarResp, err := job.client.RunJar(api.RunOpts{ JarID: *job.def.Status.JarId, AllowNonRestoredState: true,