fix: resolve issue with jar upgrade not uploading new jar

Ensure the new jar is properly uploaded during an upgrade process. Previously, the jar was not replaced as expected.
This commit is contained in:
Mohamad Khani 2024-12-13 19:41:59 +03:30
parent 699cf12f72
commit d36b9c30df
11 changed files with 153 additions and 45 deletions

View File

@ -90,6 +90,8 @@ spec:
format: time
runningJarURI:
type: string
pauseSavepointTriggerId:
type: string
restoredCount:
type: number
additionalPrinterColumns:

View File

@ -1,17 +1,14 @@
package crd
import (
"flink-kube-operator/pkg"
"github.com/reactivex/rxgo/v2"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
for j := range jobEventObservable.Observe() {
jobEvent := j.V.(*FlinkJobCrdEvent)
pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName()))
//pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName()))
controllerutil.AddFinalizer(jobEvent.Job, "")
}
}

View File

@ -41,8 +41,6 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error
newJob, err := convertFromUnstructured(unstructuredJob)
if err != nil {
pkg.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err))
} else {
pkg.Logger.Debug("[crd] [status] patched")
}
jobs[jobUid] = newJob
if err != nil {

View File

@ -26,6 +26,7 @@ type FlinkJobStatus struct {
JobId *string `json:"jobId,omitempty"`
Error *string `json:"error,omitempty"`
SavepointTriggerId *string `json:"savepointTriggerId,omitempty"`
PauseSavepointTriggerId *string `json:"pauseSavepointTriggerId,omitempty"`
LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"`
LastRestoredSavepointDate *time.Time `json:"lastRestoredSavepointDate,omitempty"`
LastRestoredSavepointRestoredDate *time.Time `json:"lastRestoredSavepointRestoredDate,omitempty"`
@ -77,7 +78,9 @@ type LifeCycleStatus string
const (
LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING"
LifeCycleStatusRestoring LifeCycleStatus = "RESTORING"
LifeCycleStatusGracefulStopFailed LifeCycleStatus = "GRACEFUL_STOP_FAILED"
LifeCycleStatusUpgradeFailed LifeCycleStatus = "UPGRADE_FAILED"
LifeCycleStatusGracefullyPaused LifeCycleStatus = "GRACEFULLY_PAUSED"
LifeCycleStatusUnhealthyJobManager LifeCycleStatus = "UNHEALTHY_JOB_MANAGER"
LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY"
LifeCycleStatusFailed LifeCycleStatus = "FAILED"

View File

@ -10,11 +10,16 @@ import (
)
func (job *ManagedJob) Cycle() {
pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID)))
pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
// Init job
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" {
job.run()
job.run(false)
return
}
if job.def.Status.JobStatus == v1alpha1.JobStatusFinished && job.def.Status.LifeCycleStatus == v1alpha1.LifeCycleStatusGracefullyPaused {
job.run(true)
return
}
@ -32,6 +37,10 @@ func (job *ManagedJob) Cycle() {
return
}
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
return
}
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
// //job.restore()
// return

View File

@ -0,0 +1,71 @@
package managed_job
import (
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/pkg"
"os"
"time"
api "github.com/logi-camp/go-flink-client"
"go.uber.org/zap"
)
func (job *ManagedJob) pause() error {
var err error
if job.def.Status.JobId != nil {
result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
if stopJobErr != nil {
err = stopJobErr
pkg.Logger.Error("[managed-job] [pause] cannot stop job", zap.Error(err))
return err
}
var savepointPath string
for {
trackResult, err := job.client.TrackSavepoint(*job.def.Status.JobId, result.RequestID)
time.Sleep(time.Millisecond * 500)
if err == nil && trackResult.Status.Id == api.SavepointStatusInCompleted {
if trackResult.Operation.Location != "" {
savepointPath = trackResult.Operation.Location
}
break
}
}
if savepointPath != "" {
job.def.Status.LastSavepointPath = &savepointPath
job.def.Status.PauseSavepointTriggerId = nil
job.def.Status.JobStatus = ""
job.def.Status.LastSavepointPath = &savepointPath
lastSavepointDate := time.Now()
job.def.Status.LastSavepointDate = &lastSavepointDate
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jobStatus": "FINISHED",
"lifeCycleStatus": v1alpha1.LifeCycleStatusGracefullyPaused,
"savepointTriggerId": nil,
"lastSavepointPath": savepointPath,
"lastSavepointDate": lastSavepointDate.Format(time.RFC3339),
},
})
pkg.Logger.Info(
"[managed-job] job paused successfully",
zap.String("jobName", job.def.GetName()),
zap.String("savepointPath", savepointPath),
)
} else {
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"lifeCycleStatus": v1alpha1.LifeCycleStatusGracefulStopFailed,
"lastSavepointPath": savepointPath,
"lastSavepointDate": time.Now().Format(time.RFC3339),
},
})
pkg.Logger.Error(
"[managed-job] error in pausing job",
zap.Error(err),
)
return err
}
}
return nil
}

View File

@ -12,22 +12,29 @@ import (
)
// run the job from savepoint and jarId in managedJob
func (job *ManagedJob) run() error {
func (job *ManagedJob) run(restoreMode bool) error {
var savepointPath string
if job.def.Status.LastSavepointPath == nil {
pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
//return v1alpha1.ErrNoSavepointPath
if restoreMode {
return v1alpha1.ErrNoSavepointPath
}
} else {
savepointPath = *job.def.Status.LastSavepointPath
}
pkg.Logger.Info("[managed-job] [restore] restoring job", zap.String("name", job.def.GetName()), zap.String("savepointPath", savepointPath))
pkg.Logger.Info(
"[managed-job] [restore] starting job...",
zap.Bool("restoreMode", restoreMode),
zap.String("name", job.def.GetName()),
zap.String("savepointPath", savepointPath),
)
var jobId *string
for {
shouldUpload := false
if job.def.Status.JarId == nil {
err := v1alpha1.ErrNoJarId
pkg.Logger.Error("[managed-job] [run]", zap.Error(err))
pkg.Logger.Warn("[managed-job] [run] will upload new jar...", zap.Error(err))
shouldUpload = true
} else {
runJarResp, err := job.client.RunJar(api.RunOpts{

View File

@ -23,7 +23,7 @@ func (job ManagedJob) createSavepoint() error {
pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
return err
}
pkg.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp))
pkg.Logger.Debug("[managed-job] [savepoint] savepoint created successfully", zap.String("trigger-id", resp.RequestID))
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{

View File

@ -1,28 +1,38 @@
package managed_job
import (
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/pkg"
"go.uber.org/zap"
)
func (job *ManagedJob) upgrade() {
if job.def.Status.LastSavepointPath != nil {
pkg.Logger.Info("upgrading job ",
pkg.Logger.Info("[managed-job] [upgrade] pausing... ",
zap.String("jobName", job.def.GetName()),
zap.String("currentJarURI", job.def.Spec.JarURI),
zap.String("prevJarURI", *job.def.Status.RunningJarURI),
)
job.run()
} else {
err := "There is no savepoint path existing"
pkg.Logger.Error(err)
job.def.Status.JarId = nil
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"lifeCycleStatus": v1alpha1.LifeCycleStatusUpgradeFailed,
"error": err,
"jarId": job.def.Status.JarId,
},
})
err := job.pause()
if err != nil {
pkg.Logger.Error("[managed-job] [upgrade] error in pausing", zap.Error(err))
return
}
pkg.Logger.Info("[managed-job] [upgrade] restoring... ",
zap.String("jobName", job.def.GetName()),
zap.String("currentJarURI", job.def.Spec.JarURI),
zap.String("prevJarURI", *job.def.Status.RunningJarURI),
zap.Error(err),
)
err = job.run(true)
if err != nil {
pkg.Logger.Error("[managed-job] [upgrade] error in running", zap.Error(err))
return
}
}

View File

@ -12,19 +12,19 @@ import (
func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
if err != nil {
pkg.Logger.Debug("[main] error on download jar", zap.Error(err))
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err
}
jarId, err := jarFile.Upload(job.client)
if err != nil {
pkg.Logger.Debug("[main] error on upload jar", zap.Error(err))
pkg.Logger.Debug("[manage-job] [upload] error on upload jar", zap.Error(err))
return err
}
err = jarFile.Delete()
if err != nil {
pkg.Logger.Debug("[main] error on delete jar", zap.Error(err))
pkg.Logger.Debug("[manage-job] [upload] error on delete jar", zap.Error(err))
}
pkg.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", jarId))
pkg.Logger.Info("[manage-job] [upload] uploaded", zap.Any("upload-jar-resp", jarId))
job.def.Status.JarId = &jarId
job.crd.Patch(job.def.UID, map[string]interface{}{

View File

@ -17,6 +17,7 @@ import (
type Manager struct {
client *api.Client
managedJobs map[types.UID]managed_job.ManagedJob
processingJobsIds []types.UID
}
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
@ -25,8 +26,10 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
mgr := Manager{
client: client,
managedJobs: map[types.UID]managed_job.ManagedJob{},
processingJobsIds: []types.UID{},
}
mgr.cycle(client, crdInstance)
go func() {
for {
select {
@ -47,7 +50,6 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
pkg.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError))
crdInstance.PatchAll(map[string]interface{}{
"status": map[string]interface{}{
"jobStatus": "",
"lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager,
},
})
@ -56,8 +58,14 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
// Loop over job definitions as Kubernetes CRD
for _, uid := range crd.GetAllJobKeys() {
if lo.Contains(mgr.processingJobsIds, uid) {
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
continue
}
// Get job definition from Kubernetes CRD
def := crd.GetJob(uid)
mgr.processingJobsIds = append(mgr.processingJobsIds, uid)
// Check if job exists in manager managed jobs
managedJob, ok := mgr.managedJobs[uid]
@ -80,21 +88,24 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
})
if ok {
pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
var jobLifeCycleStatus *string
patchStatusObj := map[string]interface{}{
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
}
if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) {
status := string(v1alpha1.LifeCycleStatusHealthy)
jobLifeCycleStatus = &status
patchStatusObj["lifeCycleStatus"] = &status
}
crdInstance.Patch(uid, map[string]interface{}{
"status": map[string]interface{}{
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
"lifeCycleStatus": jobLifeCycleStatus,
},
"status": patchStatusObj,
})
}
managedJob.Cycle()
mgr.managedJobs[uid] = managedJob
mgr.processingJobsIds = lo.Filter(mgr.processingJobsIds, func(current types.UID, i int) bool {
return current != uid
})
}
}