From d36b9c30df58c3889b66d6345b768a80b429b807 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Fri, 13 Dec 2024 19:41:59 +0330 Subject: [PATCH] fix: resolve issue with jar upgrade not uploading new jar Ensure the new jar is properly uploaded during an upgrade process. Previously, the jar was not replaced as expected. --- crds.yaml | 2 + internal/crd/finalizer.go | 5 +-- internal/crd/patch.go | 2 - internal/crd/v1alpha1/flink_job.go | 3 ++ internal/managed_job/cycle.go | 13 +++++- internal/managed_job/pause.go | 71 ++++++++++++++++++++++++++++++ internal/managed_job/run.go | 15 +++++-- internal/managed_job/savepoint.go | 2 +- internal/managed_job/upgrade.go | 44 +++++++++++------- internal/managed_job/upload.go | 8 ++-- internal/manager/manager.go | 33 +++++++++----- 11 files changed, 153 insertions(+), 45 deletions(-) create mode 100644 internal/managed_job/pause.go diff --git a/crds.yaml b/crds.yaml index 9e898c2..191d083 100644 --- a/crds.yaml +++ b/crds.yaml @@ -90,6 +90,8 @@ spec: format: time runningJarURI: type: string + pauseSavepointTriggerId: + type: string restoredCount: type: number additionalPrinterColumns: diff --git a/internal/crd/finalizer.go b/internal/crd/finalizer.go index 3c2e837..dc9f599 100644 --- a/internal/crd/finalizer.go +++ b/internal/crd/finalizer.go @@ -1,17 +1,14 @@ package crd import ( - "flink-kube-operator/pkg" - "github.com/reactivex/rxgo/v2" - "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) { for j := range jobEventObservable.Observe() { jobEvent := j.V.(*FlinkJobCrdEvent) - pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName())) + //pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName())) controllerutil.AddFinalizer(jobEvent.Job, "") } } diff --git a/internal/crd/patch.go b/internal/crd/patch.go index 0bc4857..434121a 100644 --- a/internal/crd/patch.go +++ b/internal/crd/patch.go @@ -41,8 +41,6 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error newJob, err := convertFromUnstructured(unstructuredJob) if err != nil { pkg.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err)) - } else { - pkg.Logger.Debug("[crd] [status] patched") } jobs[jobUid] = newJob if err != nil { diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 5ca48cc..54acb62 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -26,6 +26,7 @@ type FlinkJobStatus struct { JobId *string `json:"jobId,omitempty"` Error *string `json:"error,omitempty"` SavepointTriggerId *string `json:"savepointTriggerId,omitempty"` + PauseSavepointTriggerId *string `json:"pauseSavepointTriggerId,omitempty"` LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"` LastRestoredSavepointDate *time.Time `json:"lastRestoredSavepointDate,omitempty"` LastRestoredSavepointRestoredDate *time.Time `json:"lastRestoredSavepointRestoredDate,omitempty"` @@ -77,7 +78,9 @@ type LifeCycleStatus string const ( LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING" LifeCycleStatusRestoring LifeCycleStatus = "RESTORING" + LifeCycleStatusGracefulStopFailed LifeCycleStatus = "GRACEFUL_STOP_FAILED" LifeCycleStatusUpgradeFailed LifeCycleStatus = "UPGRADE_FAILED" + LifeCycleStatusGracefullyPaused LifeCycleStatus = "GRACEFULLY_PAUSED" LifeCycleStatusUnhealthyJobManager LifeCycleStatus = "UNHEALTHY_JOB_MANAGER" LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY" LifeCycleStatusFailed LifeCycleStatus = "FAILED" diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 7d73248..0aa3448 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -10,11 +10,16 @@ import ( ) func (job *ManagedJob) Cycle() { - pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID))) + pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName())) // Init job if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" { - job.run() + job.run(false) + return + } + + if job.def.Status.JobStatus == v1alpha1.JobStatusFinished && job.def.Status.LifeCycleStatus == v1alpha1.LifeCycleStatusGracefullyPaused { + job.run(true) return } @@ -32,6 +37,10 @@ func (job *ManagedJob) Cycle() { return } + + if job.def.Status.JobStatus == v1alpha1.JobStatusCreating { + return + } // if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil { // //job.restore() // return diff --git a/internal/managed_job/pause.go b/internal/managed_job/pause.go new file mode 100644 index 0000000..89c699b --- /dev/null +++ b/internal/managed_job/pause.go @@ -0,0 +1,71 @@ +package managed_job + +import ( + "flink-kube-operator/internal/crd/v1alpha1" + "flink-kube-operator/pkg" + "os" + "time" + + api "github.com/logi-camp/go-flink-client" + "go.uber.org/zap" +) + +func (job *ManagedJob) pause() error { + var err error + if job.def.Status.JobId != nil { + result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false) + if stopJobErr != nil { + err = stopJobErr + pkg.Logger.Error("[managed-job] [pause] cannot stop job", zap.Error(err)) + return err + } + var savepointPath string + for { + trackResult, err := job.client.TrackSavepoint(*job.def.Status.JobId, result.RequestID) + time.Sleep(time.Millisecond * 500) + if err == nil && trackResult.Status.Id == api.SavepointStatusInCompleted { + if trackResult.Operation.Location != "" { + savepointPath = trackResult.Operation.Location + } + break + } + } + if savepointPath != "" { + job.def.Status.LastSavepointPath = &savepointPath + job.def.Status.PauseSavepointTriggerId = nil + job.def.Status.JobStatus = "" + job.def.Status.LastSavepointPath = &savepointPath + lastSavepointDate := time.Now() + job.def.Status.LastSavepointDate = &lastSavepointDate + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "jobStatus": "FINISHED", + "lifeCycleStatus": v1alpha1.LifeCycleStatusGracefullyPaused, + "savepointTriggerId": nil, + "lastSavepointPath": savepointPath, + "lastSavepointDate": lastSavepointDate.Format(time.RFC3339), + }, + }) + pkg.Logger.Info( + "[managed-job] job paused successfully", + zap.String("jobName", job.def.GetName()), + zap.String("savepointPath", savepointPath), + ) + } else { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "lifeCycleStatus": v1alpha1.LifeCycleStatusGracefulStopFailed, + "lastSavepointPath": savepointPath, + "lastSavepointDate": time.Now().Format(time.RFC3339), + }, + }) + pkg.Logger.Error( + "[managed-job] error in pausing job", + zap.Error(err), + ) + + return err + } + } + return nil +} diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index aebc3b2..01adae7 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -12,22 +12,29 @@ import ( ) // run the job from savepoint and jarId in managedJob -func (job *ManagedJob) run() error { +func (job *ManagedJob) run(restoreMode bool) error { var savepointPath string if job.def.Status.LastSavepointPath == nil { pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) - //return v1alpha1.ErrNoSavepointPath + if restoreMode { + return v1alpha1.ErrNoSavepointPath + } } else { savepointPath = *job.def.Status.LastSavepointPath } - pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath)) + pkg.Logger.Info( + "[managed-job] [restore] starting job...", + zap.Bool("restoreMode", restoreMode), + zap.String("name", job.def.GetName()), + zap.String("savepointPath", savepointPath), + ) var jobId *string for { shouldUpload := false if job.def.Status.JarId == nil { err := v1alpha1.ErrNoJarId - pkg.Logger.Error("[managed-job] [run]", zap.Error(err)) + pkg.Logger.Warn("[managed-job] [run] will upload new jar...", zap.Error(err)) shouldUpload = true } else { runJarResp, err := job.client.RunJar(api.RunOpts{ diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index 0eb4058..bc4beac 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -23,7 +23,7 @@ func (job ManagedJob) createSavepoint() error { pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err)) return err } - pkg.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp)) + pkg.Logger.Debug("[managed-job] [savepoint] savepoint created successfully", zap.String("trigger-id", resp.RequestID)) job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ diff --git a/internal/managed_job/upgrade.go b/internal/managed_job/upgrade.go index 4f5c494..08f5873 100644 --- a/internal/managed_job/upgrade.go +++ b/internal/managed_job/upgrade.go @@ -1,28 +1,38 @@ package managed_job import ( - "flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/pkg" "go.uber.org/zap" ) func (job *ManagedJob) upgrade() { - if job.def.Status.LastSavepointPath != nil { - pkg.Logger.Info("upgrading job ", - zap.String("jobName", job.def.GetName()), - zap.String("currentJarURI", job.def.Spec.JarURI), - zap.String("prevJarURI", *job.def.Status.RunningJarURI), - ) - job.run() - } else { - err := "There is no savepoint path existing" - pkg.Logger.Error(err) - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "lifeCycleStatus": v1alpha1.LifeCycleStatusUpgradeFailed, - "error": err, - }, - }) + pkg.Logger.Info("[managed-job] [upgrade] pausing... ", + zap.String("jobName", job.def.GetName()), + zap.String("currentJarURI", job.def.Spec.JarURI), + zap.String("prevJarURI", *job.def.Status.RunningJarURI), + ) + job.def.Status.JarId = nil + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "jarId": job.def.Status.JarId, + }, + }) + err := job.pause() + if err != nil { + pkg.Logger.Error("[managed-job] [upgrade] error in pausing", zap.Error(err)) + return + } + pkg.Logger.Info("[managed-job] [upgrade] restoring... ", + zap.String("jobName", job.def.GetName()), + zap.String("currentJarURI", job.def.Spec.JarURI), + zap.String("prevJarURI", *job.def.Status.RunningJarURI), + zap.Error(err), + ) + + err = job.run(true) + if err != nil { + pkg.Logger.Error("[managed-job] [upgrade] error in running", zap.Error(err)) + return } } diff --git a/internal/managed_job/upload.go b/internal/managed_job/upload.go index f39911f..fb2ebb3 100644 --- a/internal/managed_job/upload.go +++ b/internal/managed_job/upload.go @@ -12,19 +12,19 @@ import ( func (job *ManagedJob) upload() error { jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) if err != nil { - pkg.Logger.Debug("[main] error on download jar", zap.Error(err)) + pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err)) return err } jarId, err := jarFile.Upload(job.client) if err != nil { - pkg.Logger.Debug("[main] error on upload jar", zap.Error(err)) + pkg.Logger.Debug("[manage-job] [upload] error on upload jar", zap.Error(err)) return err } err = jarFile.Delete() if err != nil { - pkg.Logger.Debug("[main] error on delete jar", zap.Error(err)) + pkg.Logger.Debug("[manage-job] [upload] error on delete jar", zap.Error(err)) } - pkg.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", jarId)) + pkg.Logger.Info("[manage-job] [upload] uploaded", zap.Any("upload-jar-resp", jarId)) job.def.Status.JarId = &jarId job.crd.Patch(job.def.UID, map[string]interface{}{ diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 52b412c..2eeccf2 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -15,18 +15,21 @@ import ( ) type Manager struct { - client *api.Client - managedJobs map[types.UID]managed_job.ManagedJob + client *api.Client + managedJobs map[types.UID]managed_job.ManagedJob + processingJobsIds []types.UID } func NewManager(client *api.Client, crdInstance *crd.Crd) Manager { ticker := time.NewTicker(5 * time.Second) quit := make(chan struct{}) mgr := Manager{ - client: client, - managedJobs: map[types.UID]managed_job.ManagedJob{}, + client: client, + managedJobs: map[types.UID]managed_job.ManagedJob{}, + processingJobsIds: []types.UID{}, } + mgr.cycle(client, crdInstance) go func() { for { select { @@ -47,7 +50,6 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { pkg.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError)) crdInstance.PatchAll(map[string]interface{}{ "status": map[string]interface{}{ - "jobStatus": "", "lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager, }, }) @@ -56,8 +58,14 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { + if lo.Contains(mgr.processingJobsIds, uid) { + pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid)) + continue + } + // Get job definition from Kubernetes CRD def := crd.GetJob(uid) + mgr.processingJobsIds = append(mgr.processingJobsIds, uid) // Check if job exists in manager managed jobs managedJob, ok := mgr.managedJobs[uid] @@ -80,21 +88,24 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { }) if ok { pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) - var jobLifeCycleStatus *string + patchStatusObj := map[string]interface{}{ + "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), + } if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) { status := string(v1alpha1.LifeCycleStatusHealthy) - jobLifeCycleStatus = &status + patchStatusObj["lifeCycleStatus"] = &status } crdInstance.Patch(uid, map[string]interface{}{ - "status": map[string]interface{}{ - "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), - "lifeCycleStatus": jobLifeCycleStatus, - }, + "status": patchStatusObj, }) } managedJob.Cycle() mgr.managedJobs[uid] = managedJob + + mgr.processingJobsIds = lo.Filter(mgr.processingJobsIds, func(current types.UID, i int) bool { + return current != uid + }) } }