package manager import ( "flink-kube-operator/internal/crd" "flink-kube-operator/internal/managed_job" "time" "gitea.com/logicamp/lc" api "github.com/logi-camp/go-flink-client" "github.com/samber/lo" "github.com/tidwall/buntdb" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" ) type Manager struct { client *api.Client managedJobs map[types.UID]managed_job.ManagedJob } func NewManager(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) Manager { ticker := time.NewTicker(5 * time.Second) quit := make(chan struct{}) mgr := Manager{ client: client, managedJobs: map[types.UID]managed_job.ManagedJob{}, } go func() { for { select { case <-ticker.C: mgr.cycle(client, db, crdInstance) case <-quit: ticker.Stop() return } } }() return mgr } func (mgr *Manager) cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) { jobsOverviews, err := mgr.client.JobsOverview() if err != nil { lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(err)) } //lc.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews)) for _, uid := range crd.GetAllJobKeys() { def := crd.GetJob(uid) managedJob, ok := mgr.managedJobs[uid] if ok { managedJob.Update(def) } else { managedJob = *managed_job.NewManagedJob(client, db, def, crdInstance) //mgr.managedJobs[uid] = managedJob } jobOverview, ok := lo.Find(jobsOverviews.Jobs, func(job api.JobOverview) bool { jobId := managedJob.GetJobId() if jobId != nil { return job.ID == *jobId } return false }) if ok { lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobOverview.Name), zap.String("state", jobOverview.State)) managedJob.SetStatus(managed_job.JobStatus(jobOverview.State)) } managedJob.Cycle() mgr.managedJobs[uid] = managedJob } }