package crd import ( "context" "flink-kube-operator/internal/crd/v1alpha1" "os" "flink-kube-operator/pkg" "github.com/reactivex/rxgo/v2" "go.uber.org/zap" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) func (crd Crd) watchFlinkJobs() rxgo.Observable { ch := make(chan rxgo.Item) go func() { pkg.Logger.Debug("[crd] starting watch") watcher, err := crd.client.Namespace(os.Getenv("NAMESPACE")).Watch(context.Background(), metaV1.ListOptions{}) if err != nil { panic(err) } defer watcher.Stop() for event := range watcher.ResultChan() { unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) if err != nil { pkg.Logger.Error("cannot create unstructured map", zap.Error(err)) continue } job := &v1alpha1.FlinkJob{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) if err != nil { pkg.Logger.Error("cannot convert unstructured to structured", zap.Error(err)) continue } go func() { ch <- rxgo.Item{ V: &FlinkJobCrdEvent{ EventType: event.Type, Job: job, }, } }() pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name)) switch event.Type { case watch.Bookmark: case watch.Modified: //pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) crd.repsert(job) case watch.Added: //pkg.Logger.Info("[crd] [watch] new flink job created") crd.repsert(job) case watch.Deleted: crd.remove(job.UID) } } }() return rxgo.FromChannel(ch) }