Mohammadreza Khani d36b9c30df fix: resolve issue with jar upgrade not uploading new jar
Ensure the new jar is properly uploaded during an upgrade process. Previously, the jar was not replaced as expected.
2024-12-13 19:41:59 +03:30

72 lines
2.1 KiB
Go

package managed_job
import (
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/pkg"
"os"
"time"
api "github.com/logi-camp/go-flink-client"
"go.uber.org/zap"
)
func (job *ManagedJob) pause() error {
var err error
if job.def.Status.JobId != nil {
result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
if stopJobErr != nil {
err = stopJobErr
pkg.Logger.Error("[managed-job] [pause] cannot stop job", zap.Error(err))
return err
}
var savepointPath string
for {
trackResult, err := job.client.TrackSavepoint(*job.def.Status.JobId, result.RequestID)
time.Sleep(time.Millisecond * 500)
if err == nil && trackResult.Status.Id == api.SavepointStatusInCompleted {
if trackResult.Operation.Location != "" {
savepointPath = trackResult.Operation.Location
}
break
}
}
if savepointPath != "" {
job.def.Status.LastSavepointPath = &savepointPath
job.def.Status.PauseSavepointTriggerId = nil
job.def.Status.JobStatus = ""
job.def.Status.LastSavepointPath = &savepointPath
lastSavepointDate := time.Now()
job.def.Status.LastSavepointDate = &lastSavepointDate
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jobStatus": "FINISHED",
"lifeCycleStatus": v1alpha1.LifeCycleStatusGracefullyPaused,
"savepointTriggerId": nil,
"lastSavepointPath": savepointPath,
"lastSavepointDate": lastSavepointDate.Format(time.RFC3339),
},
})
pkg.Logger.Info(
"[managed-job] job paused successfully",
zap.String("jobName", job.def.GetName()),
zap.String("savepointPath", savepointPath),
)
} else {
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"lifeCycleStatus": v1alpha1.LifeCycleStatusGracefulStopFailed,
"lastSavepointPath": savepointPath,
"lastSavepointDate": time.Now().Format(time.RFC3339),
},
})
pkg.Logger.Error(
"[managed-job] error in pausing job",
zap.Error(err),
)
return err
}
}
return nil
}