65 lines
1.9 KiB
Go
65 lines
1.9 KiB
Go
package crd
|
|
|
|
import (
|
|
"context"
|
|
"flink-kube-operator/internal/crd/v1alpha1"
|
|
"flink-kube-operator/pkg"
|
|
|
|
"github.com/reactivex/rxgo/v2"
|
|
"go.uber.org/zap"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
)
|
|
|
|
var FinalizerChannel chan (types.UID) = make(chan (types.UID))
|
|
|
|
func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
|
|
|
|
finalizerName := "flink-operator.logicamp.tech/finalizer"
|
|
for j := range jobEventObservable.Observe() {
|
|
go func() {
|
|
|
|
jobEvent := j.V.(*FlinkJobCrdEvent)
|
|
|
|
if jobEvent.Job.GetDeletionTimestamp() != nil {
|
|
// Resource is being deleted
|
|
if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
|
|
// Perform cleanup
|
|
pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName()))
|
|
if err := crd.cleanupResources(jobEvent.Job); err != nil {
|
|
pkg.Logger.Info("[crd] [manage-finalizer] cleanup failed", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Remove finalizer
|
|
controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName)
|
|
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
|
|
pkg.Logger.Info("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err))
|
|
return
|
|
}
|
|
pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName()))
|
|
|
|
}
|
|
return
|
|
}
|
|
|
|
// Add finalizer if not present
|
|
if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
|
|
controllerutil.AddFinalizer(jobEvent.Job, finalizerName)
|
|
pkg.Logger.Debug("[finalizer] adding job")
|
|
// Update the resource to add the finalizer
|
|
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
|
|
pkg.Logger.Info("[finalizer] failed to add", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (crd Crd) cleanupResources(job *v1alpha1.FlinkJob) error {
|
|
FinalizerChannel <- job.GetUID()
|
|
return nil
|
|
}
|