fix: resolve finalize is not working in some cases

This commit is contained in:
Mohamad Khani 2025-03-09 02:17:59 +03:30
parent 346f69100c
commit b33dc0ba1d
2 changed files with 7 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import (
"flink-kube-operator/internal/crd/v1alpha1"
"os"
"github.com/reactivex/rxgo/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
@ -48,11 +49,13 @@ func New() *Crd {
runtimeClient: runtimeClient,
}
// Watch for FlinkJob creation
jobEventObservable := crd.watchFlinkJobs()
jobEventCh := make(chan rxgo.Item)
// add finalizer to new resources
go crd.manageFinalizer(jobEventObservable)
go crd.manageFinalizer(rxgo.FromChannel(jobEventCh))
// Watch for FlinkJob creation
crd.watchFlinkJobs(jobEventCh)
return &crd
}

View File

@ -15,9 +15,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
func (crd Crd) watchFlinkJobs() rxgo.Observable {
ch := make(chan rxgo.Item)
func (crd Crd) watchFlinkJobs(ch chan rxgo.Item) rxgo.Observable {
go func() {
pkg.Logger.Debug("[crd] starting watch")