Mohammadreza Khani 830e265162 feat: apply new helm structure
use minio s3 for savepoint and checkpoint path
separate task-manager, job-manager and operator
use statefulset for task-manager to handle replication
support basic credential for download jar request
update to flink 1.20.1
2025-04-05 01:39:02 +03:30

45 lines
1.2 KiB
Go

package managed_job
import (
"flink-kube-operator/internal/jar"
"flink-kube-operator/pkg"
"go.uber.org/zap"
)
// upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err
}
jarId, err := jarFile.Upload(job.client)
if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on upload jar", zap.Error(err))
return err
}
pkg.Logger.Info("[manage-job] [upload] uploaded", zap.Any("upload-jar-resp", jarId))
job.def.Status.JarId = &jarId
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jarId": job.def.Status.JarId,
},
})
return nil
}
func (job *ManagedJob) RemoveJar() {
if job.def.Status.JarId != nil {
err := job.client.DeleteJar(*job.def.Status.JarId)
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
err = job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jarId": nil,
},
})
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
}
}