diff --git a/internal/managed_job/manager.go b/internal/managed_job/manager.go index 599a413..e16b664 100644 --- a/internal/managed_job/manager.go +++ b/internal/managed_job/manager.go @@ -86,8 +86,8 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { mgr.processingJobsIds = append(mgr.processingJobsIds, uid) // Check if job exists in manager managed jobs - managedJob, ok := mgr.managedJobs[uid] - if ok { + managedJob, jobFound := mgr.managedJobs[uid] + if jobFound { managedJob.Update(def) } else { // Add job to manager managed job @@ -97,14 +97,14 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { } - jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool { + jobManagerJobOverview, jobFound := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool { jobId := managedJob.GetJobId() if jobId != nil { return job.ID == *jobId } return false }) - if ok { + if jobFound { // pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) patchStatusObj := map[string]interface{}{ "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), @@ -114,6 +114,15 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { patchStatusObj["lifeCycleStatus"] = &status } + crdInstance.Patch(uid, map[string]interface{}{ + "status": patchStatusObj, + }) + } else { + patchStatusObj := map[string]interface{}{ + "jobStatus": "", + "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed), + } + crdInstance.Patch(uid, map[string]interface{}{ "status": patchStatusObj, })