package managed_job import ( "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" "time" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "github.com/samber/lo" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" ) type Manager struct { client *api.Client managedJobs map[types.UID]ManagedJob processingJobsIds []types.UID } var mgr Manager func GetManager() Manager { return mgr } func NewManager(client *api.Client, crdInstance *crd.Crd) Manager { ticker := time.NewTicker(5 * time.Second) quit := make(chan struct{}) mgr = Manager{ client: client, managedJobs: map[types.UID]ManagedJob{}, processingJobsIds: []types.UID{}, } mgr.cycle(client, crdInstance) go func() { for { select { case <-ticker.C: mgr.cycle(client, crdInstance) case <-quit: ticker.Stop() return } } }() go func() { for event := range crd.FinalizerChannel { manager := mgr.GetJob(event) manager.Stop() delete(mgr.managedJobs, event) } }() return mgr } func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { jobManagerJobOverviews, jobManagerJobStatusError := mgr.client.JobsOverview() if jobManagerJobStatusError != nil { pkg.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError)) crdInstance.PatchAll(map[string]interface{}{ "status": map[string]interface{}{ "lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager, }, }) } //pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews)) // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { // pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds)) if lo.Contains(mgr.processingJobsIds, uid) { pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid)) continue } // Get job definition from Kubernetes CRD def := crd.GetJob(uid) mgr.processingJobsIds = append(mgr.processingJobsIds, uid) // Check if job exists in manager managed jobs managedJob, ok := mgr.managedJobs[uid] if ok { managedJob.Update(def) } else { // Add job to manager managed job managedJob = *NewManagedJob(client, def, crdInstance) } if jobManagerJobStatusError != nil { } jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool { jobId := managedJob.GetJobId() if jobId != nil { return job.ID == *jobId } return false }) if ok { // pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) patchStatusObj := map[string]interface{}{ "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), } if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) { status := string(v1alpha1.LifeCycleStatusHealthy) patchStatusObj["lifeCycleStatus"] = &status } crdInstance.Patch(uid, map[string]interface{}{ "status": patchStatusObj, }) } managedJob.Cycle() mgr.managedJobs[uid] = managedJob mgr.processingJobsIds = lo.Filter(mgr.processingJobsIds, func(current types.UID, i int) bool { return current != uid }) } } func (mgr *Manager) GetJobs() map[types.UID]ManagedJob { return mgr.managedJobs } func (mgr *Manager) GetJob(id types.UID) ManagedJob { return mgr.managedJobs[id] }