From 55dbe9f8c2afbb5b7af8b99c09519dbcfaef72dd Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Tue, 18 Feb 2025 23:08:19 +0330 Subject: [PATCH] feat: add debug logs --- internal/crd/finalizer.go | 5 ++--- internal/crd/watch.go | 14 ++++++++------ internal/managed_job/cycle.go | 2 +- internal/managed_job/manager.go | 5 +++-- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/internal/crd/finalizer.go b/internal/crd/finalizer.go index 8b775dc..b556852 100644 --- a/internal/crd/finalizer.go +++ b/internal/crd/finalizer.go @@ -17,10 +17,9 @@ func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) { finalizerName := "flink-operator.logicamp.tech/finalizer" for j := range jobEventObservable.Observe() { + jobEvent := j.V.(*FlinkJobCrdEvent) + pkg.Logger.Debug("[crd] [manage-finalizer] main loop", zap.String("name", jobEvent.Job.Name)) go func() { - - jobEvent := j.V.(*FlinkJobCrdEvent) - if jobEvent.Job.GetDeletionTimestamp() != nil { // Resource is being deleted if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { diff --git a/internal/crd/watch.go b/internal/crd/watch.go index 3c56b7f..94c41c2 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -41,12 +41,14 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { continue } - ch <- rxgo.Item{ - V: &FlinkJobCrdEvent{ - EventType: event.Type, - Job: job, - }, - } + go func() { + ch <- rxgo.Item{ + V: &FlinkJobCrdEvent{ + EventType: event.Type, + Job: job, + }, + } + }() pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name)) switch event.Type { case watch.Bookmark: diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 5417eb8..707c900 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -51,5 +51,5 @@ func (job *ManagedJob) Cycle() { // return // } - pkg.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus))) + pkg.Logger.Warn("[managed-job] [cycle] unhanded job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus))) } diff --git a/internal/managed_job/manager.go b/internal/managed_job/manager.go index e16b664..4227691 100644 --- a/internal/managed_job/manager.go +++ b/internal/managed_job/manager.go @@ -71,11 +71,10 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { }, }) } - //pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews)) + pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobManagerJobOverviews)) // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { - // pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds)) if lo.Contains(mgr.processingJobsIds, uid) { pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid)) continue @@ -97,6 +96,8 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { } + pkg.Logger.Debug("[manager] [cycle] finding job", zap.Any("name", managedJob.def.Name)) + jobManagerJobOverview, jobFound := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool { jobId := managedJob.GetJobId() if jobId != nil {