diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index cce4e60..5ef872d 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -7,9 +7,8 @@ import ( //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE type FlinkJobSpec struct { - Name string `json:"name"` - Parallelism int `json:"parallelism"` - Parallelism2 int `json:"parallelism2"` + Name string `json:"name"` + Parallelism int `json:"parallelism"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/internal/crd/watch.go b/internal/crd/watch.go index 6a52b1d..050f54d 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -2,28 +2,46 @@ package crd import ( "context" + "flink-kube-operator/internal/crd/v1alpha1" "fmt" "gitea.com/logicamp/lc" "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) func (crd Crd) watchFlinkJobs() { lc.Logger.Debug("[crd] starting watch") - watcher, err := crd.client.Watch(context.Background(), metav1.ListOptions{}) + watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) if err != nil { panic(err) } defer watcher.Stop() for event := range watcher.ResultChan() { - lc.Logger.Debug("[crd] event received") + lc.Logger.Debug("[crd] event received", zap.Any("type", event.Type)) + unstructuredJob := event.Object.(*unstructured.Unstructured) + unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) + if err != nil { + lc.Logger.Error("cannot create unstructured map", zap.Error(err)) + continue + } + job := &v1alpha1.FlinkJob{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) + if err != nil { + lc.Logger.Error("cannot convert unstructured to structured", zap.Error(err)) + continue + } switch event.Type { + case watch.Bookmark: + case watch.Modified: + fmt.Printf("FlinkJob updated: %s\n", job.GetName()) + // Handle the new FlinkJob + handleNewFlinkJob(job) case watch.Added: - job := event.Object.(*unstructured.Unstructured) fmt.Printf("New FlinkJob created: %s\n", job.GetName()) // Handle the new FlinkJob handleNewFlinkJob(job) @@ -31,21 +49,14 @@ func (crd Crd) watchFlinkJobs() { } } -func handleNewFlinkJob(job *unstructured.Unstructured) { +func handleNewFlinkJob(job *v1alpha1.FlinkJob) { // Extract job details name := job.GetName() namespace := job.GetNamespace() - // Get specific fields using unstructured.Unstructured methods - spec, found, err := unstructured.NestedMap(job.Object, "spec") - if err != nil || !found { - fmt.Printf("Error getting spec for job %s: %v\n", name, err) - return - } - // Process job specification - fmt.Printf("Processing FlinkJob %s in namespace %s\n", name, namespace) + fmt.Printf("Processing FlinkJob %s in namespace %s kind: %s \n", name, namespace, job.Kind) - lc.Logger.Debug("[crd] [watch]", zap.Any("spec", spec)) + lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job)) // Add your custom logic here }