From 346f69100c2207508c42d8e0cd42037a0aec8ffd Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Tue, 4 Mar 2025 23:55:28 +0330 Subject: [PATCH] fix: resolve missing restrict resources namespace --- internal/crd/new.go | 11 ++++++----- internal/crd/patch.go | 1 - internal/crd/watch.go | 11 ++++++++--- internal/managed_job/cycle.go | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/internal/crd/new.go b/internal/crd/new.go index b64cb88..9fd55d9 100644 --- a/internal/crd/new.go +++ b/internal/crd/new.go @@ -2,6 +2,7 @@ package crd import ( "flink-kube-operator/internal/crd/v1alpha1" + "os" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" @@ -12,7 +13,7 @@ import ( ) type Crd struct { - client dynamic.NamespaceableResourceInterface + client dynamic.ResourceInterface runtimeClient client.Client } @@ -32,12 +33,12 @@ func New() *Crd { panic(err) } - shema := runtime.NewScheme() - v1alpha1.AddKnownTypes(shema) + scheme := runtime.NewScheme() + v1alpha1.AddKnownTypes(scheme) // Get FlinkJob resource interface - flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR) + flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR).Namespace(os.Getenv("NAMESPACE")) runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{ - Scheme: shema, + Scheme: scheme, }) if err != nil { panic(err) diff --git a/internal/crd/patch.go b/internal/crd/patch.go index 4d7da40..0fc0f12 100644 --- a/internal/crd/patch.go +++ b/internal/crd/patch.go @@ -23,7 +23,6 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error // Patch the status sub-resource unstructuredJob, err := crd.client. - Namespace(job.GetNamespace()). Patch( context.Background(), job.GetName(), diff --git a/internal/crd/watch.go b/internal/crd/watch.go index 94c41c2..b2aa5b1 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -21,23 +21,28 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { go func() { pkg.Logger.Debug("[crd] starting watch") - watcher, err := crd.client.Namespace(os.Getenv("NAMESPACE")).Watch(context.Background(), metaV1.ListOptions{}) + watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) if err != nil { panic(err) } defer watcher.Stop() + namespace := os.Getenv("NAMESPACE") + pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace)) for event := range watcher.ResultChan() { unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) if err != nil { - pkg.Logger.Error("cannot create unstructured map", zap.Error(err)) + pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err)) continue } job := &v1alpha1.FlinkJob{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) if err != nil { - pkg.Logger.Error("cannot convert unstructured to structured", zap.Error(err)) + pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err)) + continue + } + if job.Namespace != namespace { continue } diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 707c900..1976a34 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -51,5 +51,5 @@ func (job *ManagedJob) Cycle() { // return // } - pkg.Logger.Warn("[managed-job] [cycle] unhanded job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus))) + pkg.Logger.Warn("[managed-job] [cycle] unhandled job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus)), zap.String("namespace", job.def.Namespace)) }