From 4d6b06efe770b8a7b96e0ef4c1927e7f79910db8 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Wed, 12 Mar 2025 23:27:40 +0330 Subject: [PATCH] fix: restore watcher if channel is closed --- internal/crd/watch.go | 86 ++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/internal/crd/watch.go b/internal/crd/watch.go index 417c762..53abe7a 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -17,52 +17,54 @@ import ( func (crd Crd) watchFlinkJobs(ch chan FlinkJobCrdEvent) { go func() { - pkg.Logger.Debug("[crd] starting watch") - watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) - if err != nil { - panic(err) - } - defer watcher.Stop() - namespace := os.Getenv("NAMESPACE") - pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace)) - for event := range watcher.ResultChan() { - unstructuredJob := event.Object.(*unstructured.Unstructured) - unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) + for { + pkg.Logger.Debug("[crd] starting watch") + watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) if err != nil { - pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err)) - continue + panic(err) } - job := &v1alpha1.FlinkJob{} - - err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) - if err != nil { - pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err)) - continue - } - if job.Namespace != namespace { - continue - } - - go func() { - ch <- FlinkJobCrdEvent{ - EventType: event.Type, - Job: job, + namespace := os.Getenv("NAMESPACE") + pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace)) + for event := range watcher.ResultChan() { + unstructuredJob := event.Object.(*unstructured.Unstructured) + unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) + if err != nil { + pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err)) + continue } - }() - pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type))) - switch event.Type { - case watch.Bookmark: - case watch.Modified: - //pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) - crd.repsert(job) - case watch.Added: - //pkg.Logger.Info("[crd] [watch] new flink job created") - crd.repsert(job) - case watch.Deleted: - crd.remove(job.UID) - } - } + job := &v1alpha1.FlinkJob{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job) + if err != nil { + pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err)) + continue + } + if job.Namespace != namespace { + continue + } + + go func() { + ch <- FlinkJobCrdEvent{ + EventType: event.Type, + Job: job, + } + }() + pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type))) + switch event.Type { + case watch.Bookmark: + case watch.Modified: + //pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) + crd.repsert(job) + case watch.Added: + //pkg.Logger.Info("[crd] [watch] new flink job created") + crd.repsert(job) + case watch.Deleted: + crd.remove(job.UID) + } + } + defer watcher.Stop() + pkg.Logger.Warn("[crd] [watch] Watcher stopped, restarting...") + } }() }