From 5abc044d693abe48e3dcbb567fb105db908e4991 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sun, 8 Dec 2024 00:47:53 +0330 Subject: [PATCH] feat: add restore statuses to kubernetes crd --- crds.yaml | 23 ++++++++++++++++++-- internal/crd/patch.go | 13 ++++++++++- internal/crd/v1alpha1/flink_job.go | 27 +++++++++++++---------- internal/managed_job/new.go | 1 - internal/managed_job/restore.go | 20 ++++++++++++----- internal/managed_job/run.go | 15 +++++++++++-- internal/managed_job/savepoint.go | 2 +- internal/manager/check_jobs_status.go | 8 ------- internal/manager/manager.go | 31 ++++++++++++++++++++------- 9 files changed, 101 insertions(+), 39 deletions(-) delete mode 100644 internal/manager/check_jobs_status.go diff --git a/crds.yaml b/crds.yaml index b3fa032..d5d3ccf 100644 --- a/crds.yaml +++ b/crds.yaml @@ -69,6 +69,8 @@ spec: type: string jobId: type: string + jarId: + type: string error: type: string lastSavepointPath: @@ -80,6 +82,14 @@ spec: lastSavepointDate: type: string format: time + lastRestoredSavepointDate: + type: string + format: time + lastRestoredSavepointRestoredDate: + type: string + format: time + restoredCount: + type: number additionalPrinterColumns: - name: Status type: string @@ -87,6 +97,15 @@ spec: - name: Age type: date jsonPath: .metadata.creationTimestamp - - name: LifeCycleStatus + - name: Life Cycle Status type: string - jsonPath: .status.lifeCycleStatus \ No newline at end of file + jsonPath: .status.lifeCycleStatus + - name: Last Savepoint + type: date + jsonPath: .status.lastSavepointDate + - name: Last Restored Savepoint + type: date + jsonPath: .status.lastRestoredSavepointDate + - name: Restored Count + type: number + jsonPath: .status.restoredCount diff --git a/internal/crd/patch.go b/internal/crd/patch.go index 4a4481b..1c8fa24 100644 --- a/internal/crd/patch.go +++ b/internal/crd/patch.go @@ -19,7 +19,7 @@ func (crd Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error { return fmt.Errorf("error marshaling patch data: %w", err) } - // Patch the status subresource + // Patch the status sub-resource unstructuredJob, err := crd.client. Namespace(job.GetNamespace()). Patch( @@ -49,3 +49,14 @@ func (crd Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error { } return nil } + +func (crd Crd) PatchAll(patchData map[string]interface{}) error { + keys := GetAllJobKeys() + for _, key := range keys { + err := crd.Patch(key, patchData) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 27cd2dc..31017e2 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -19,13 +19,17 @@ type FlinkJobSpec struct { } type FlinkJobStatus struct { - JobStatus JobStatus `json:"jobStatus,omitempty"` - LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"` - LastSavepointPath *string `json:"lastSavepointPath,omitempty"` - JobId *string `json:"jobId,omitempty"` - Error *string `json:"error,omitempty"` - SavepointTriggerId *string `json:"savepointTriggerId,omitempty"` - LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"` + JobStatus JobStatus `json:"jobStatus,omitempty"` + LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"` + LastSavepointPath *string `json:"lastSavepointPath,omitempty"` + JarId *string `json:"jarId,omitempty"` + JobId *string `json:"jobId,omitempty"` + Error *string `json:"error,omitempty"` + SavepointTriggerId *string `json:"savepointTriggerId,omitempty"` + LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"` + LastRestoredSavepointDate *time.Time `json:"lastRestoredSavepointDate,omitempty"` + LastRestoredSavepointRestoredDate *time.Time `json:"lastRestoredSavepointRestoredDate,omitempty"` + RestoredCount int `json:"restoredCount,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -69,8 +73,9 @@ var ( type LifeCycleStatus string const ( - LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING" - LifeCycleStatusRestoring LifeCycleStatus = "RESTORING" - LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY" - LifeCycleStatusFailed LifeCycleStatus = "FAILED" + LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING" + LifeCycleStatusRestoring LifeCycleStatus = "RESTORING" + LifeCycleStatusUnhealthyJobManager LifeCycleStatus = "UNHEALTHY_JOB_MANAGER" + LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY" + LifeCycleStatusFailed LifeCycleStatus = "FAILED" ) diff --git a/internal/managed_job/new.go b/internal/managed_job/new.go index 36f564e..31a5e62 100644 --- a/internal/managed_job/new.go +++ b/internal/managed_job/new.go @@ -10,7 +10,6 @@ import ( type ManagedJob struct { def v1alpha1.FlinkJob client *api.Client - jarId string crd *crd.Crd } diff --git a/internal/managed_job/restore.go b/internal/managed_job/restore.go index 37e7a49..2b86cab 100644 --- a/internal/managed_job/restore.go +++ b/internal/managed_job/restore.go @@ -1,7 +1,9 @@ package managed_job import ( + "errors" "flink-kube-operator/internal/crd/v1alpha1" + "time" "gitea.com/logicamp/lc" api "github.com/logi-camp/go-flink-client" @@ -14,9 +16,14 @@ func (job *ManagedJob) restore() error { lc.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) return v1alpha1.ErrNoSavepointPath } + if job.def.Status.JarId == nil { + err := errors.New("missing jar id") + lc.Logger.Error("[managed-job] [run]", zap.Error(err)) + return err + } lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.def.Status.LastSavepointPath)) runJarResp, err := job.client.RunJar(api.RunOpts{ - JarID: job.jarId, + JarID: *job.def.Status.JarId, AllowNonRestoredState: true, EntryClass: job.def.Spec.EntryClass, SavepointPath: *job.def.Status.LastSavepointPath, @@ -32,10 +39,13 @@ func (job *ManagedJob) restore() error { // job.def.Status.Error = nil job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ - "jobId": &runJarResp.JobId, - "jobStatus": v1alpha1.JobStatusCreating, - "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, - "error": nil, + "jobId": &runJarResp.JobId, + "jobStatus": v1alpha1.JobStatusCreating, + "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, + "lastRestoredSavepointDate": job.def.Status.LastRestoredSavepointDate, + "restoredCount": job.def.Status.RestoredCount + 1, + "lastRestoredSavepointRestoreDate": time.Now().Format(time.RFC3339), + "error": nil, }, }) diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index f692f96..c5c2c50 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -1,6 +1,7 @@ package managed_job import ( + "errors" "flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/internal/jar" @@ -24,14 +25,24 @@ func (job *ManagedJob) upload() error { } lc.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", fileName)) - job.jarId = fileName + job.def.Status.JarId = &fileName + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "jarId": job.def.Status.JarId, + }, + }) return nil } // run the job from saved jarId in managedJob func (job *ManagedJob) run() error { + if job.def.Status.JarId == nil { + err := errors.New("missing jar id") + lc.Logger.Error("[managed-job] [run]", zap.Error(err)) + return err + } runJarResp, err := job.client.RunJar(api.RunOpts{ - JarID: job.jarId, + JarID: *job.def.Status.JarId, AllowNonRestoredState: true, EntryClass: job.def.Spec.EntryClass, }) diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index 07f9fb6..9cdd9ab 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -55,7 +55,7 @@ func (job ManagedJob) trackSavepoint() error { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "lastSavepointPath": resp.Operation.Location, - "lastSavepointDate": time.Now(), + "lastSavepointDate": time.Now().Format(time.RFC3339), }, }) } diff --git a/internal/manager/check_jobs_status.go b/internal/manager/check_jobs_status.go deleted file mode 100644 index b422bd6..0000000 --- a/internal/manager/check_jobs_status.go +++ /dev/null @@ -1,8 +0,0 @@ -package manager - -import api "github.com/logi-camp/go-flink-client" - -func (mgr *Manager) checkJobStatus(client *api.Client) error { - - return nil -} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 15a09d1..71a7e7e 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -41,22 +41,36 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager { } func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { - jobsOverviews, err := mgr.client.JobsOverview() - if err != nil { - lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(err)) + jobManagerJobOverviews, jobManagerJobStatusError := mgr.client.JobsOverview() + if jobManagerJobStatusError != nil { + lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError)) + crdInstance.PatchAll(map[string]interface{}{ + "status": map[string]interface{}{ + "jobStatus": "", + "lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager, + }, + }) } //lc.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews)) + // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { + // Get job definition from Kubernetes CRD def := crd.GetJob(uid) + + // Check if job exists in manager managed jobs managedJob, ok := mgr.managedJobs[uid] if ok { managedJob.Update(def) } else { + // Add job to manager managed job managedJob = *managed_job.NewManagedJob(client, def, crdInstance) - //mgr.managedJobs[uid] = managedJob } - jobOverview, ok := lo.Find(jobsOverviews.Jobs, func(job api.JobOverview) bool { + if jobManagerJobStatusError != nil { + + } + + jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool { jobId := managedJob.GetJobId() if jobId != nil { return job.ID == *jobId @@ -64,20 +78,21 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { return false }) if ok { - lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobOverview.Name), zap.String("state", jobOverview.State)) + lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) var jobLifeCycleStatus *string - if jobOverview.State == string(v1alpha1.JobStatusRunning) { + if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) { status := string(v1alpha1.LifeCycleStatusHealthy) jobLifeCycleStatus = &status } crdInstance.Patch(uid, map[string]interface{}{ "status": map[string]interface{}{ - "jobStatus": v1alpha1.JobStatus(jobOverview.State), + "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), "lifeCycleStatus": jobLifeCycleStatus, }, }) } + managedJob.Cycle() mgr.managedJobs[uid] = managedJob }