From 2c25323e625779f453e385514b5d692fc8dfd773 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sat, 7 Dec 2024 01:09:25 +0330 Subject: [PATCH] feat(manager): make check status from flink batch --- .vscode/launch.json | 16 ++++++++++ cmd/operator/main.go | 2 +- go.mod | 3 +- go.sum | 4 +++ internal/crd/event.go | 5 +-- internal/managed_job/cycle.go | 46 ++++++++++++--------------- internal/managed_job/new.go | 2 +- internal/managed_job/run.go | 8 ++++- internal/managed_job/state.go | 10 +++++- internal/managed_job/status.go | 2 +- internal/manager/check_jobs_status.go | 8 +++++ internal/manager/manager.go | 44 +++++++++++++++++++++---- 12 files changed, 109 insertions(+), 41 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 internal/manager/check_jobs_status.go diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..af96def --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Package", + "type": "go", + "request": "launch", + "mode": "auto", + "cwd": "${workspaceFolder}", + "program": "${workspaceFolder}/cmd/operator" + } + ] +} \ No newline at end of file diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 0618c71..7ee1b97 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -44,7 +44,7 @@ func main() { fmt.Println(clusterConfig) // init flink job manager - manager.Setup(c, db, crdInstance) + manager.NewManager(c, db, crdInstance) // for _, jobDef := range config.Jobs { // managed_job.NewManagedJob(c, db, jobDef) diff --git a/go.mod b/go.mod index 36f9d32..db20453 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.2 require ( gitea.com/logicamp/lc v1.14.6 - github.com/logi-camp/go-flink-client v0.1.3 + github.com/logi-camp/go-flink-client v0.2.0 github.com/matoous/go-nanoid/v2 v2.1.0 github.com/tidwall/buntdb v1.3.2 go.uber.org/zap v1.27.0 @@ -24,6 +24,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/samber/lo v1.47.0 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index 388829b..26383f8 100644 --- a/go.sum +++ b/go.sum @@ -234,6 +234,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/logi-camp/go-flink-client v0.1.3 h1:YaVH0yJUIcZn8KPodNLXKuo2T394ph9XUEGUhU8+sDQ= github.com/logi-camp/go-flink-client v0.1.3/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s= +github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -329,6 +331,8 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE= github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs= github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= diff --git a/internal/crd/event.go b/internal/crd/event.go index b5c42bf..6e2f357 100644 --- a/internal/crd/event.go +++ b/internal/crd/event.go @@ -41,11 +41,12 @@ func (crd Crd) SetJobStatus(jobUid types.UID, status string) error { ) return err } - patched, err := convertFromUnstructured(unstructuredJob) + _, err = convertFromUnstructured(unstructuredJob) if err != nil { lc.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err)) + } else { + lc.Logger.Debug("[crd] [status] patched") } - lc.Logger.Debug("[crd] [status] set status", zap.Any("statusUpdateObj", patched)) if err != nil { lc.Logger.Error("[crd] [status] ", zap.Error(err)) return err diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 9cdbab9..9707f96 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -1,33 +1,32 @@ package managed_job import ( - "errors" "time" "gitea.com/logicamp/lc" "go.uber.org/zap" ) -func (job *ManagedJob) startCycle() { - ticker := time.NewTicker(5 * time.Second) - quit := make(chan struct{}) +// func (job *ManagedJob) startCycle() { +// ticker := time.NewTicker(5 * time.Second) +// quit := make(chan struct{}) - // load job state from db - job.loadState() - go func() { - for { - select { - case <-ticker.C: - job.cycle() - case <-quit: - ticker.Stop() - return - } - } - }() -} +// // load job state from db +// job.loadState() +// go func() { +// for { +// select { +// case <-ticker.C: +// job.cycle() +// case <-quit: +// ticker.Stop() +// return +// } +// } +// }() +// } -func (job *ManagedJob) cycle() { +func (job *ManagedJob) Cycle() { lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID))) // Init job @@ -47,20 +46,15 @@ func (job *ManagedJob) cycle() { job.crd.SetJobStatus(job.def.UID, string(job.state.Status)) // Check for set running or error state - if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing { + /* if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing { err := job.checkStatus() if errors.Is(err, ErrNoJobId) { job.state = nil } return - } + } */ if job.state.Status == JobStatusRunning { - err := job.checkStatus() - if errors.Is(err, ErrNoJobId) { - job.state = nil - } - lc.Logger.Debug("savepoint interval", zap.Any("savepoint duration", job.def.Spec.SavepointInterval)) if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.state.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate)) { if job.state.SavepointTriggerId == nil { job.createSavepoint() diff --git a/internal/managed_job/new.go b/internal/managed_job/new.go index 3a2a4bd..b5c8a29 100644 --- a/internal/managed_job/new.go +++ b/internal/managed_job/new.go @@ -24,7 +24,7 @@ func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob, crd db: db, crd: crd, } - job.startCycle() + //job.startCycle() return job } diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index af220e4..5e2e6cb 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -40,7 +40,13 @@ func (job *ManagedJob) run() error { } lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) - job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating}) + if job.state == nil { + job.state = &jobState{} + } + job.state.JobId = &runJarResp.JobId + job.state.Status = JobStatusCreating + job.updateState(*job.state) + //job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating}) return err } diff --git a/internal/managed_job/state.go b/internal/managed_job/state.go index 68ca259..1981645 100644 --- a/internal/managed_job/state.go +++ b/internal/managed_job/state.go @@ -57,7 +57,15 @@ func (job *ManagedJob) removeSavepointTriggerId() { job.updateState(*job.state) } -func (job *ManagedJob) setStatus(status JobStatus) { +func (job *ManagedJob) SetStatus(status JobStatus) { job.state.Status = status job.updateState(*job.state) } + +func (job *ManagedJob) GetJobId() *string { + if job.state != nil && job.state.JobId != nil { + return job.state.JobId + } else { + return nil + } +} diff --git a/internal/managed_job/status.go b/internal/managed_job/status.go index 274cad6..f42226c 100644 --- a/internal/managed_job/status.go +++ b/internal/managed_job/status.go @@ -24,6 +24,6 @@ func (job *ManagedJob) checkStatus() error { return err } //lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp)) - job.setStatus(JobStatus(statusResp.State)) + job.SetStatus(JobStatus(statusResp.State)) return err } diff --git a/internal/manager/check_jobs_status.go b/internal/manager/check_jobs_status.go new file mode 100644 index 0000000..b422bd6 --- /dev/null +++ b/internal/manager/check_jobs_status.go @@ -0,0 +1,8 @@ +package manager + +import api "github.com/logi-camp/go-flink-client" + +func (mgr *Manager) checkJobStatus(client *api.Client) error { + + return nil +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 426b8d4..e26b64d 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -5,39 +5,69 @@ import ( "flink-kube-operator/internal/managed_job" "time" + "gitea.com/logicamp/lc" api "github.com/logi-camp/go-flink-client" + "github.com/samber/lo" "github.com/tidwall/buntdb" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" ) -var managedJobs = map[types.UID]managed_job.ManagedJob{} +type Manager struct { + client *api.Client + managedJobs map[types.UID]managed_job.ManagedJob +} -func Setup(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) { +func NewManager(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) Manager { ticker := time.NewTicker(5 * time.Second) quit := make(chan struct{}) + mgr := Manager{ + client: client, + managedJobs: map[types.UID]managed_job.ManagedJob{}, + } go func() { for { select { case <-ticker.C: - cycle(client, db, crdInstance) + mgr.cycle(client, db, crdInstance) case <-quit: ticker.Stop() return } } }() + return mgr } -func cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) { +func (mgr *Manager) cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) { + jobsOverviews, err := mgr.client.JobsOverview() + if err != nil { + lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(err)) + } + //lc.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews)) + for _, uid := range crd.GetAllJobKeys() { def := crd.GetJob(uid) - managedJob, ok := managedJobs[uid] + managedJob, ok := mgr.managedJobs[uid] if ok { managedJob.Update(def) } else { - managedJob := managed_job.NewManagedJob(client, db, def, crdInstance) - managedJobs[uid] = *managedJob + managedJob = *managed_job.NewManagedJob(client, db, def, crdInstance) + //mgr.managedJobs[uid] = managedJob } + jobOverview, ok := lo.Find(jobsOverviews.Jobs, func(job api.JobOverview) bool { + jobId := managedJob.GetJobId() + if jobId != nil { + return job.ID == *jobId + } + return false + }) + if ok { + lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobOverview.Name), zap.String("state", jobOverview.State)) + managedJob.SetStatus(managed_job.JobStatus(jobOverview.State)) + } + managedJob.Cycle() + mgr.managedJobs[uid] = managedJob } }