package crd import ( "context" "flink-kube-operator/internal/crd/v1alpha1" "fmt" "gitea.com/logicamp/lc" "go.uber.org/zap" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) func (crd Crd) watchFlinkJobs() { lc.Logger.Debug("[crd] starting watch") watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) if err != nil { panic(err) } defer watcher.Stop() for event := range watcher.ResultChan() { lc.Logger.Debug("[crd] event received", zap.Any("type", event.Type)) unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) if err != nil { lc.Logger.Error("cannot create unstructured map", zap.Error(err)) continue } job := &v1alpha1.FlinkJob{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) if err != nil { lc.Logger.Error("cannot convert unstructured to structured", zap.Error(err)) continue } switch event.Type { case watch.Bookmark: case watch.Modified: fmt.Printf("FlinkJob updated: %s\n", job.GetName()) // Handle the new FlinkJob handleNewFlinkJob(job) case watch.Added: fmt.Printf("New FlinkJob created: %s\n", job.GetName()) // Handle the new FlinkJob handleNewFlinkJob(job) } } } func handleNewFlinkJob(job *v1alpha1.FlinkJob) { // Extract job details name := job.GetName() namespace := job.GetNamespace() // Process job specification fmt.Printf("Processing FlinkJob %s in namespace %s kind: %s \n", name, namespace, job.Kind) lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job)) // Add your custom logic here }