diff --git a/cmd/operator/main.go b/cmd/operator/main.go index aaece7e..a6bd4fa 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -2,7 +2,7 @@ package main import ( "flink-kube-operator/internal/crd" - "flink-kube-operator/internal/manager" + "flink-kube-operator/internal/managed_job" "flink-kube-operator/internal/rest" "flink-kube-operator/pkg" "log" @@ -31,7 +31,7 @@ func main() { pkg.Logger.Info("[main]", zap.Any("cluster-config", clusterConfig)) // init flink job manager - manager.NewManager(c, crdInstance) + managed_job.NewManager(c, crdInstance) // for _, jobDef := range config.Jobs { // managed_job.NewManagedJob(c, db, jobDef) diff --git a/internal/crd/finalizer.go b/internal/crd/finalizer.go index dc9f599..517ae55 100644 --- a/internal/crd/finalizer.go +++ b/internal/crd/finalizer.go @@ -1,14 +1,60 @@ package crd import ( + "context" + "flink-kube-operator/internal/crd/v1alpha1" + "flink-kube-operator/pkg" + "github.com/reactivex/rxgo/v2" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) +var FinalizerChannel chan (types.UID) = make(chan (types.UID)) + func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) { + + finalizerName := "flink-operator.logicamp.tech/finalizer" for j := range jobEventObservable.Observe() { jobEvent := j.V.(*FlinkJobCrdEvent) - //pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName())) - controllerutil.AddFinalizer(jobEvent.Job, "") + + if jobEvent.Job.GetDeletionTimestamp() != nil { + // Resource is being deleted + if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { + // Perform cleanup + pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName())) + if err := crd.cleanupResources(jobEvent.Job); err != nil { + pkg.Logger.Error("[crd] [manage-finalizer] cleanup failed", zap.Error(err)) + return + } + + // Remove finalizer + controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName) + if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil { + pkg.Logger.Error("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err)) + return + } + pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName())) + + } + return + } + + // Add finalizer if not present + if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { + controllerutil.AddFinalizer(jobEvent.Job, finalizerName) + pkg.Logger.Debug("[finalizer] adding job") + // Update the resource to add the finalizer + if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil { + pkg.Logger.Error("[finalizer] failed to add", zap.Error(err)) + return + } + } } } + +func (crd Crd) cleanupResources(job *v1alpha1.FlinkJob) error { + FinalizerChannel <- job.GetUID() + return nil +} diff --git a/internal/crd/new.go b/internal/crd/new.go index fb8d2c1..b64cb88 100644 --- a/internal/crd/new.go +++ b/internal/crd/new.go @@ -3,36 +3,48 @@ package crd import ( "flink-kube-operator/internal/crd/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" ) type Crd struct { - client dynamic.NamespaceableResourceInterface + client dynamic.NamespaceableResourceInterface + runtimeClient client.Client } func New() *Crd { - // Get Kubernetes config - config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) + // Get Kubernetes config_ + config_, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { - config, err = rest.InClusterConfig() + config_, err = rest.InClusterConfig() if err != nil { panic(err) } } // Create dynamic client - dynamicClient, err := dynamic.NewForConfig(config) + dynamicClient, err := dynamic.NewForConfig(config_) if err != nil { panic(err) } + shema := runtime.NewScheme() + v1alpha1.AddKnownTypes(shema) // Get FlinkJob resource interface flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR) - + runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{ + Scheme: shema, + }) + if err != nil { + panic(err) + } crd := Crd{ - client: flinkJobClient, + client: flinkJobClient, + runtimeClient: runtimeClient, } // Watch for FlinkJob creation diff --git a/internal/crd/patch.go b/internal/crd/patch.go index 523c0f2..4d7da40 100644 --- a/internal/crd/patch.go +++ b/internal/crd/patch.go @@ -14,7 +14,7 @@ import ( func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error { job := GetJob(jobUid) - pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid)) + // pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid)) patchBytes, err := json.Marshal(patchData) if err != nil { diff --git a/internal/crd/repo.go b/internal/crd/repo.go index d96c94f..9ed7cec 100644 --- a/internal/crd/repo.go +++ b/internal/crd/repo.go @@ -14,6 +14,10 @@ func (crd *Crd) repsert(job *v1alpha1.FlinkJob) { jobs.Store(job.GetUID(), job) } +func (crd *Crd) remove(uid types.UID) { + jobs.Delete(uid) +} + func GetJob(uid types.UID) v1alpha1.FlinkJob { job, _ := jobs.Load(uid) return *job.DeepCopy() diff --git a/internal/crd/v1alpha1/register.go b/internal/crd/v1alpha1/register.go index 68de54d..c90c3ed 100644 --- a/internal/crd/v1alpha1/register.go +++ b/internal/crd/v1alpha1/register.go @@ -20,11 +20,11 @@ var FlinkJobGVR = schema.GroupVersionResource{ } var ( - SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + SchemeBuilder = runtime.NewSchemeBuilder(AddKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) -func addKnownTypes(scheme *runtime.Scheme) error { +func AddKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &FlinkJob{}, &FlinkJobList{}, diff --git a/internal/crd/watch.go b/internal/crd/watch.go index fd1de5c..c7ab767 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -56,6 +56,8 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { //pkg.Logger.Info("[crd] [watch] new flink job created") crd.repsert(job) case watch.Deleted: + crd.remove(job.UID) + } } diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 57ad0e6..4d15b85 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -10,7 +10,7 @@ import ( ) func (job *ManagedJob) Cycle() { - pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName())) + // pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName())) // Init job if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" { diff --git a/internal/manager/manager.go b/internal/managed_job/manager.go similarity index 80% rename from internal/manager/manager.go rename to internal/managed_job/manager.go index a8b8954..386ed85 100644 --- a/internal/manager/manager.go +++ b/internal/managed_job/manager.go @@ -1,9 +1,8 @@ -package manager +package managed_job import ( "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" - "flink-kube-operator/internal/managed_job" "time" "flink-kube-operator/pkg" @@ -16,7 +15,7 @@ import ( type Manager struct { client *api.Client - managedJobs map[types.UID]managed_job.ManagedJob + managedJobs map[types.UID]ManagedJob processingJobsIds []types.UID } @@ -31,7 +30,7 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager { quit := make(chan struct{}) mgr = Manager{ client: client, - managedJobs: map[types.UID]managed_job.ManagedJob{}, + managedJobs: map[types.UID]ManagedJob{}, processingJobsIds: []types.UID{}, } @@ -47,6 +46,15 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager { } } }() + + go func() { + for event := range crd.FinalizerChannel { + manager := mgr.GetJob(event) + manager.Stop() + delete(mgr.managedJobs, event) + } + }() + return mgr } @@ -64,7 +72,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { - pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds)) + // pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds)) if lo.Contains(mgr.processingJobsIds, uid) { pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid)) continue @@ -80,7 +88,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { managedJob.Update(def) } else { // Add job to manager managed job - managedJob = *managed_job.NewManagedJob(client, def, crdInstance) + managedJob = *NewManagedJob(client, def, crdInstance) } if jobManagerJobStatusError != nil { @@ -94,7 +102,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { return false }) if ok { - pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) + // pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State)) patchStatusObj := map[string]interface{}{ "jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State), } @@ -117,10 +125,10 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { } } -func (mgr *Manager) GetJobs() map[types.UID]managed_job.ManagedJob { +func (mgr *Manager) GetJobs() map[types.UID]ManagedJob { return mgr.managedJobs } -func (mgr *Manager) GetJob(id types.UID) managed_job.ManagedJob { +func (mgr *Manager) GetJob(id types.UID) ManagedJob { return mgr.managedJobs[id] } diff --git a/internal/rest/controller/crd.go b/internal/rest/controller/crd.go index 854f122..3dca138 100644 --- a/internal/rest/controller/crd.go +++ b/internal/rest/controller/crd.go @@ -4,7 +4,7 @@ import ( "context" "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" - "flink-kube-operator/internal/manager" + "flink-kube-operator/internal/managed_job" "k8s.io/apimachinery/pkg/types" ) @@ -39,7 +39,7 @@ type StopJobResp struct { } func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { - mgr := manager.GetManager() + mgr := managed_job.GetManager() job := mgr.GetJob(types.UID(req.JobUId)) err := job.Stop() if err != nil { @@ -51,7 +51,7 @@ func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { } func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { - mgr := manager.GetManager() + mgr := managed_job.GetManager() job := mgr.GetJob(types.UID(req.JobUId)) err := job.Run(true) if err != nil { @@ -63,7 +63,7 @@ func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { } func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { - mgr := manager.GetManager() + mgr := managed_job.GetManager() job := mgr.GetJob(types.UID(req.JobUId)) job.RemoveJar() return &StopJobResp{Body: StopJobRespBody{ @@ -72,7 +72,7 @@ func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { } func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { - mgr := manager.GetManager() + mgr := managed_job.GetManager() job := mgr.GetJob(types.UID(req.JobUId)) job.Pause() return &StopJobResp{Body: StopJobRespBody{