package crd import ( "context" "flink-kube-operator/internal/crd/v1alpha1" "os" "flink-kube-operator/pkg" "go.uber.org/zap" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) func (crd Crd) watchFlinkJobs(ch chan FlinkJobCrdEvent) { go func() { pkg.Logger.Debug("[crd] starting watch") watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) if err != nil { panic(err) } defer watcher.Stop() namespace := os.Getenv("NAMESPACE") pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace)) for event := range watcher.ResultChan() { unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) if err != nil { pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err)) continue } job := &v1alpha1.FlinkJob{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) if err != nil { pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err)) continue } if job.Namespace != namespace { continue } go func() { ch <- FlinkJobCrdEvent{ EventType: event.Type, Job: job, } }() pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type))) switch event.Type { case watch.Bookmark: case watch.Modified: //pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) crd.repsert(job) case watch.Added: //pkg.Logger.Info("[crd] [watch] new flink job created") crd.repsert(job) case watch.Deleted: crd.remove(job.UID) } } }() }