From 9b219d967e98e3fcbdda72515939d1e9f3ea36dd Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Fri, 13 Dec 2024 15:00:43 +0330 Subject: [PATCH] feat: handle upgrade job --- .air.toml | 2 +- .dockerignore | 4 ++ .env.example | 3 +- .gitignore | 3 +- .vscode/launch.json | 4 ++ crds.yaml | 2 + internal/crd/v1alpha1/flink_job.go | 25 ++++---- internal/crd/watch.go | 4 +- internal/managed_job/cycle.go | 94 +++++------------------------- internal/managed_job/run.go | 69 ++++++++++++---------- internal/managed_job/upgrade.go | 28 +++++++++ internal/managed_job/upload.go | 36 ++++++++++++ 12 files changed, 147 insertions(+), 127 deletions(-) create mode 100644 internal/managed_job/upgrade.go create mode 100644 internal/managed_job/upload.go diff --git a/.air.toml b/.air.toml index e6d7edb..63a9bb5 100644 --- a/.air.toml +++ b/.air.toml @@ -4,7 +4,7 @@ tmp_dir = "tmp" [build] args_bin = [] - bin = "./tmp/main" + bin = ";set -o allexport && source ./.env && set +o allexport; ./tmp/main" cmd = "go build -o ./tmp/main ./cmd/operator" delay = 1000 exclude_dir = ["assets", "tmp", "vendor", "testdata"] diff --git a/.dockerignore b/.dockerignore index 73a24f6..4b8a6f6 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1 +1,5 @@ Tiltfile +Dockerfile +.env.example +.git +.gitignore \ No newline at end of file diff --git a/.env.example b/.env.example index 0502fad..18ded18 100644 --- a/.env.example +++ b/.env.example @@ -1 +1,2 @@ -FLINK_API_URL=127.0.0.1:8081 \ No newline at end of file +FLINK_API_URL=127.0.0.1:8081 +SAVEPOINT_PATH=/opt/flink/savepoints \ No newline at end of file diff --git a/.gitignore b/.gitignore index 1331240..071e3b0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ db *.log tmp -*.jar \ No newline at end of file +*.jar +.env \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index af96def..e4bbc26 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,6 +9,10 @@ "type": "go", "request": "launch", "mode": "auto", + "env": { + "FLINK_API_URL": "127.0.0.1:8081", + "SAVEPOINT_PATH": "/opt/flink/savepoints" + }, "cwd": "${workspaceFolder}", "program": "${workspaceFolder}/cmd/operator" } diff --git a/crds.yaml b/crds.yaml index d5d3ccf..9e898c2 100644 --- a/crds.yaml +++ b/crds.yaml @@ -88,6 +88,8 @@ spec: lastRestoredSavepointRestoredDate: type: string format: time + runningJarURI: + type: string restoredCount: type: number additionalPrinterColumns: diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 31017e2..5ca48cc 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -19,17 +19,18 @@ type FlinkJobSpec struct { } type FlinkJobStatus struct { - JobStatus JobStatus `json:"jobStatus,omitempty"` - LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"` - LastSavepointPath *string `json:"lastSavepointPath,omitempty"` - JarId *string `json:"jarId,omitempty"` - JobId *string `json:"jobId,omitempty"` - Error *string `json:"error,omitempty"` - SavepointTriggerId *string `json:"savepointTriggerId,omitempty"` - LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"` - LastRestoredSavepointDate *time.Time `json:"lastRestoredSavepointDate,omitempty"` - LastRestoredSavepointRestoredDate *time.Time `json:"lastRestoredSavepointRestoredDate,omitempty"` - RestoredCount int `json:"restoredCount,omitempty"` + JobStatus JobStatus `json:"jobStatus,omitempty"` + LifeCycleStatus LifeCycleStatus `json:"lifeCycleStatus,omitempty"` + LastSavepointPath *string `json:"lastSavepointPath,omitempty"` + JarId *string `json:"jarId,omitempty"` + JobId *string `json:"jobId,omitempty"` + Error *string `json:"error,omitempty"` + SavepointTriggerId *string `json:"savepointTriggerId,omitempty"` + LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"` + LastRestoredSavepointDate *time.Time `json:"lastRestoredSavepointDate,omitempty"` + LastRestoredSavepointRestoredDate *time.Time `json:"lastRestoredSavepointRestoredDate,omitempty"` + RestoredCount int `json:"restoredCount,omitempty"` + RunningJarURI *string `json:"runningJarURI"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -49,6 +50,7 @@ type FlinkJobList struct { var ( ErrNoJobId = errors.New("[managed-job] no job id") + ErrNoJarId = errors.New("[managed-job] no jar id") ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id") ErrNoSavepointPath = errors.New("[managed-job] no savepoint path") ) @@ -75,6 +77,7 @@ type LifeCycleStatus string const ( LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING" LifeCycleStatusRestoring LifeCycleStatus = "RESTORING" + LifeCycleStatusUpgradeFailed LifeCycleStatus = "UPGRADE_FAILED" LifeCycleStatusUnhealthyJobManager LifeCycleStatus = "UNHEALTHY_JOB_MANAGER" LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY" LifeCycleStatusFailed LifeCycleStatus = "FAILED" diff --git a/internal/crd/watch.go b/internal/crd/watch.go index a307c6e..12b3b2a 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -3,7 +3,6 @@ package crd import ( "context" "flink-kube-operator/internal/crd/v1alpha1" - "fmt" "flink-kube-operator/pkg" @@ -51,8 +50,7 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { switch event.Type { case watch.Bookmark: case watch.Modified: - pkg.Logger.Info("[crd] [watch] flink job updated") - fmt.Printf("FlinkJob updated: %s\n", job.GetName()) + pkg.Logger.Info("[crd] [watch] flink job updated", zap.String("jobName", job.GetName())) crd.repsert(job) case watch.Added: pkg.Logger.Info("[crd] [watch] new flink job created") diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index d31409e..7d73248 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -2,7 +2,6 @@ package managed_job import ( "flink-kube-operator/internal/crd/v1alpha1" - "strings" "time" "flink-kube-operator/pkg" @@ -10,83 +9,14 @@ import ( "go.uber.org/zap" ) -// func (job *ManagedJob) startCycle() { -// ticker := time.NewTicker(5 * time.Second) -// quit := make(chan struct{}) - -// // load job state from db -// job.loadState() -// go func() { -// for { -// select { -// case <-ticker.C: -// job.cycle() -// case <-quit: -// ticker.Stop() -// return -// } -// } -// }() -// } - func (job *ManagedJob) Cycle() { pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID))) // Init job - if job.def.Status.JobStatus == "" { - if job.def.Status.LastSavepointPath == nil { - if job.def.Status.JarId == nil { - err := job.upload() - if err != nil { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[upload-error] " + err.Error(), - }, - }) - return - } - } - for { - err := job.run() - if err != nil { - if strings.ContainsAny(err.Error(), ".jar does not exist") { - err := job.upload() - if err != nil { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[upload-error] " + err.Error(), - }, - }) - return - } - continue - } - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[run-error] " + err.Error(), - }, - }) - return - } - return - } - } else { - job.restore() - } + if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" { + job.run() return } - // job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{ - // JobStatus: job.def.Status.JobStatus, - // }) - - // Check for set running or error state - /* if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing { - err := job.checkStatus() - if errors.Is(err, ErrNoJobId) { - job.state = nil - } - return - } */ if job.def.Status.JobStatus == v1alpha1.JobStatusRunning { if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.def.Status.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.def.Status.LastSavepointDate)) { @@ -96,16 +26,20 @@ func (job *ManagedJob) Cycle() { job.trackSavepoint() } } + if job.def.Status.RunningJarURI != nil && job.def.Spec.JarURI != *job.def.Status.RunningJarURI { + job.upgrade() + } + return } - if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil { - job.restore() - return - } - if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath == nil { - job.restore() - return - } + // if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil { + // //job.restore() + // return + // } + // if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath == nil { + // //job.restore() + // return + // } pkg.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus))) } diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index 5983696..aebc3b2 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -1,7 +1,6 @@ package managed_job import ( - "errors" "flink-kube-operator/internal/crd/v1alpha1" "strings" "time" @@ -12,8 +11,8 @@ import ( "go.uber.org/zap" ) -// restore the job from savepoint and jarId in managedJob -func (job *ManagedJob) restore() error { +// run the job from savepoint and jarId in managedJob +func (job *ManagedJob) run() error { var savepointPath string if job.def.Status.LastSavepointPath == nil { pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath)) @@ -21,39 +20,48 @@ func (job *ManagedJob) restore() error { } else { savepointPath = *job.def.Status.LastSavepointPath } - if job.def.Status.JarId == nil { - err := errors.New("missing jar id") - pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) - return err - } + pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath)) var jobId *string for { - runJarResp, err := job.client.RunJar(api.RunOpts{ - JarID: *job.def.Status.JarId, - AllowNonRestoredState: true, - EntryClass: job.def.Spec.EntryClass, - SavepointPath: savepointPath, - }) - if err != nil { - if strings.ContainsAny(err.Error(), ".jar does not exist") { - err := job.upload() - if err != nil { - job.crd.Patch(job.def.UID, map[string]interface{}{ - "status": map[string]interface{}{ - "error": "[upload-error] " + err.Error(), - }, - }) - return nil + shouldUpload := false + if job.def.Status.JarId == nil { + err := v1alpha1.ErrNoJarId + pkg.Logger.Error("[managed-job] [run]", zap.Error(err)) + shouldUpload = true + } else { + runJarResp, err := job.client.RunJar(api.RunOpts{ + JarID: *job.def.Status.JarId, + AllowNonRestoredState: true, + EntryClass: job.def.Spec.EntryClass, + SavepointPath: savepointPath, + }) + if err == nil { + pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp)) + jobId = &runJarResp.JobId + break + } else { + if strings.ContainsAny(err.Error(), ".jar does not exist") { + shouldUpload = true + } else { + pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) } - continue } - pkg.Logger.Error("[managed-job] [restore]", zap.Error(err)) - return err } - jobId = &runJarResp.JobId - pkg.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) - break + + if shouldUpload { + err := job.upload() + if err != nil { + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "error": "[upload-error] " + err.Error(), + }, + }) + return nil + } + continue + } + return nil } // job.def.Status.JobId = &runJarResp.JobId @@ -62,6 +70,7 @@ func (job *ManagedJob) restore() error { job.crd.Patch(job.def.UID, map[string]interface{}{ "status": map[string]interface{}{ "jobId": jobId, + "runningJarURI": job.def.Spec.JarURI, "jobStatus": v1alpha1.JobStatusCreating, "lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring, "lastRestoredSavepointDate": job.def.Status.LastSavepointDate, diff --git a/internal/managed_job/upgrade.go b/internal/managed_job/upgrade.go new file mode 100644 index 0000000..4f5c494 --- /dev/null +++ b/internal/managed_job/upgrade.go @@ -0,0 +1,28 @@ +package managed_job + +import ( + "flink-kube-operator/internal/crd/v1alpha1" + "flink-kube-operator/pkg" + + "go.uber.org/zap" +) + +func (job *ManagedJob) upgrade() { + if job.def.Status.LastSavepointPath != nil { + pkg.Logger.Info("upgrading job ", + zap.String("jobName", job.def.GetName()), + zap.String("currentJarURI", job.def.Spec.JarURI), + zap.String("prevJarURI", *job.def.Status.RunningJarURI), + ) + job.run() + } else { + err := "There is no savepoint path existing" + pkg.Logger.Error(err) + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "lifeCycleStatus": v1alpha1.LifeCycleStatusUpgradeFailed, + "error": err, + }, + }) + } +} diff --git a/internal/managed_job/upload.go b/internal/managed_job/upload.go new file mode 100644 index 0000000..f39911f --- /dev/null +++ b/internal/managed_job/upload.go @@ -0,0 +1,36 @@ +package managed_job + +import ( + "flink-kube-operator/internal/jar" + + "flink-kube-operator/pkg" + + "go.uber.org/zap" +) + +// upload jar file and set the jarId for later usages +func (job *ManagedJob) upload() error { + jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) + if err != nil { + pkg.Logger.Debug("[main] error on download jar", zap.Error(err)) + return err + } + jarId, err := jarFile.Upload(job.client) + if err != nil { + pkg.Logger.Debug("[main] error on upload jar", zap.Error(err)) + return err + } + err = jarFile.Delete() + if err != nil { + pkg.Logger.Debug("[main] error on delete jar", zap.Error(err)) + } + pkg.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", jarId)) + + job.def.Status.JarId = &jarId + job.crd.Patch(job.def.UID, map[string]interface{}{ + "status": map[string]interface{}{ + "jarId": job.def.Status.JarId, + }, + }) + return nil +}