diff --git a/internal/crd/new.go b/internal/crd/new.go index 9fd55d9..a0d3c2a 100644 --- a/internal/crd/new.go +++ b/internal/crd/new.go @@ -4,6 +4,7 @@ import ( "flink-kube-operator/internal/crd/v1alpha1" "os" + "github.com/reactivex/rxgo/v2" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -48,11 +49,13 @@ func New() *Crd { runtimeClient: runtimeClient, } - // Watch for FlinkJob creation - jobEventObservable := crd.watchFlinkJobs() + jobEventCh := make(chan rxgo.Item) // add finalizer to new resources - go crd.manageFinalizer(jobEventObservable) + go crd.manageFinalizer(rxgo.FromChannel(jobEventCh)) + + // Watch for FlinkJob creation + crd.watchFlinkJobs(jobEventCh) return &crd } diff --git a/internal/crd/watch.go b/internal/crd/watch.go index b2aa5b1..164c665 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -15,9 +15,7 @@ import ( "k8s.io/apimachinery/pkg/watch" ) -func (crd Crd) watchFlinkJobs() rxgo.Observable { - - ch := make(chan rxgo.Item) +func (crd Crd) watchFlinkJobs(ch chan rxgo.Item) rxgo.Observable { go func() { pkg.Logger.Debug("[crd] starting watch")