package manager import ( "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/internal/managed_job" "time" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "github.com/samber/lo" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" ) type Manager struct { client *api.Client managedJobs map[types.UID]managed_job.ManagedJob } func NewManager(client *api.Client, crdInstance *crd.Crd) Manager { ticker := time.NewTicker(5 * time.Second) quit := make(chan struct{}) mgr := Manager{ client: client, managedJobs: map[types.UID]managed_job.ManagedJob{}, } go func() { for { select { case <-ticker.C: mgr.cycle(client, crdInstance) case <-quit: ticker.Stop() return } } }() return mgr } func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { jobManagerJobOverviews, jobManagerJobStatusError := mgr.client.JobsOverview() if jobManagerJobStatusError != nil { pkg.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError)) crdInstance.PatchAll(map[string]interface{}{ "status": map[string]interface{}{ "jobStatus": "", "lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager, }, }) } //pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews)) // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { // Get job definition from Kubernetes CRD def := crd.GetJob(uid) // Check if job exists in manager managed jobs managedJob, ok := mgr.managedJobs[uid] if ok { managedJob.Update(def) } else { // Add job to manager managed job managedJob = *managed_job.NewManagedJob(client, def, crdInstance) } if jobManagerJobStatusError != nil { } jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool { jobId := managedJob.GetJobId() if jobId != nil { return job.ID == *jobId } return false }) if ok { pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) var jobLifeCycleStatus *string if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) { status := string(v1alpha1.LifeCycleStatusHealthy) jobLifeCycleStatus = &status } crdInstance.Patch(uid, map[string]interface{}{ "status": map[string]interface{}{ "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), "lifeCycleStatus": jobLifeCycleStatus, }, }) } managedJob.Cycle() mgr.managedJobs[uid] = managedJob } }