Ensure the new jar is properly uploaded during an upgrade process. Previously, the jar was not replaced as expected.
92 lines
2.6 KiB
Go
92 lines
2.6 KiB
Go
package managed_job
|
|
|
|
import (
|
|
"flink-kube-operator/internal/crd/v1alpha1"
|
|
"strings"
|
|
"time"
|
|
|
|
"flink-kube-operator/pkg"
|
|
|
|
api "github.com/logi-camp/go-flink-client"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// run the job from savepoint and jarId in managedJob
|
|
func (job *ManagedJob) run(restoreMode bool) error {
|
|
var savepointPath string
|
|
if job.def.Status.LastSavepointPath == nil {
|
|
pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
|
|
if restoreMode {
|
|
return v1alpha1.ErrNoSavepointPath
|
|
}
|
|
} else {
|
|
savepointPath = *job.def.Status.LastSavepointPath
|
|
}
|
|
|
|
pkg.Logger.Info(
|
|
"[managed-job] [restore] starting job...",
|
|
zap.Bool("restoreMode", restoreMode),
|
|
zap.String("name", job.def.GetName()),
|
|
zap.String("savepointPath", savepointPath),
|
|
)
|
|
var jobId *string
|
|
for {
|
|
shouldUpload := false
|
|
if job.def.Status.JarId == nil {
|
|
err := v1alpha1.ErrNoJarId
|
|
pkg.Logger.Warn("[managed-job] [run] will upload new jar...", zap.Error(err))
|
|
shouldUpload = true
|
|
} else {
|
|
runJarResp, err := job.client.RunJar(api.RunOpts{
|
|
JarID: *job.def.Status.JarId,
|
|
AllowNonRestoredState: true,
|
|
EntryClass: job.def.Spec.EntryClass,
|
|
SavepointPath: savepointPath,
|
|
})
|
|
if err == nil {
|
|
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
|
jobId = &runJarResp.JobId
|
|
break
|
|
} else {
|
|
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
|
shouldUpload = true
|
|
} else {
|
|
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
if shouldUpload {
|
|
err := job.upload()
|
|
if err != nil {
|
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
|
"status": map[string]interface{}{
|
|
"error": "[upload-error] " + err.Error(),
|
|
},
|
|
})
|
|
return nil
|
|
}
|
|
continue
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// job.def.Status.JobId = &runJarResp.JobId
|
|
// job.def.Status.JobStatus = v1alpha1.JobStatusCreating
|
|
// job.def.Status.Error = nil
|
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
|
"status": map[string]interface{}{
|
|
"jobId": jobId,
|
|
"runningJarURI": job.def.Spec.JarURI,
|
|
"jobStatus": v1alpha1.JobStatusCreating,
|
|
"lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring,
|
|
"lastRestoredSavepointDate": job.def.Status.LastSavepointDate,
|
|
"restoredCount": job.def.Status.RestoredCount + 1,
|
|
"lastRestoredSavepointRestoredDate": time.Now().Format(time.RFC3339),
|
|
"error": nil,
|
|
},
|
|
})
|
|
|
|
return nil
|
|
}
|