fix: resolve missing restrict resources namespace

This commit is contained in:
Mohamad Khani 2025-03-04 23:55:28 +03:30
parent 75d0557286
commit 346f69100c
4 changed files with 15 additions and 10 deletions

View File

@ -2,6 +2,7 @@ package crd
import ( import (
"flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/internal/crd/v1alpha1"
"os"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
@ -12,7 +13,7 @@ import (
) )
type Crd struct { type Crd struct {
client dynamic.NamespaceableResourceInterface client dynamic.ResourceInterface
runtimeClient client.Client runtimeClient client.Client
} }
@ -32,12 +33,12 @@ func New() *Crd {
panic(err) panic(err)
} }
shema := runtime.NewScheme() scheme := runtime.NewScheme()
v1alpha1.AddKnownTypes(shema) v1alpha1.AddKnownTypes(scheme)
// Get FlinkJob resource interface // Get FlinkJob resource interface
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR) flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR).Namespace(os.Getenv("NAMESPACE"))
runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{ runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{
Scheme: shema, Scheme: scheme,
}) })
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -23,7 +23,6 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error
// Patch the status sub-resource // Patch the status sub-resource
unstructuredJob, err := crd.client. unstructuredJob, err := crd.client.
Namespace(job.GetNamespace()).
Patch( Patch(
context.Background(), context.Background(),
job.GetName(), job.GetName(),

View File

@ -21,23 +21,28 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
go func() { go func() {
pkg.Logger.Debug("[crd] starting watch") pkg.Logger.Debug("[crd] starting watch")
watcher, err := crd.client.Namespace(os.Getenv("NAMESPACE")).Watch(context.Background(), metaV1.ListOptions{}) watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer watcher.Stop() defer watcher.Stop()
namespace := os.Getenv("NAMESPACE")
pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace))
for event := range watcher.ResultChan() { for event := range watcher.ResultChan() {
unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
if err != nil { if err != nil {
pkg.Logger.Error("cannot create unstructured map", zap.Error(err)) pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err))
continue continue
} }
job := &v1alpha1.FlinkJob{} job := &v1alpha1.FlinkJob{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
if err != nil { if err != nil {
pkg.Logger.Error("cannot convert unstructured to structured", zap.Error(err)) pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err))
continue
}
if job.Namespace != namespace {
continue continue
} }

View File

@ -51,5 +51,5 @@ func (job *ManagedJob) Cycle() {
// return // return
// } // }
pkg.Logger.Warn("[managed-job] [cycle] unhanded job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus))) pkg.Logger.Warn("[managed-job] [cycle] unhandled job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus)), zap.String("namespace", job.def.Namespace))
} }