Mohammadreza Khani d36b9c30df fix: resolve issue with jar upgrade not uploading new jar
Ensure the new jar is properly uploaded during an upgrade process. Previously, the jar was not replaced as expected.
2024-12-13 19:41:59 +03:30

112 lines
3.0 KiB
Go

package manager
import (
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/managed_job"
"time"
"flink-kube-operator/pkg"
api "github.com/logi-camp/go-flink-client"
"github.com/samber/lo"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
)
type Manager struct {
client *api.Client
managedJobs map[types.UID]managed_job.ManagedJob
processingJobsIds []types.UID
}
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
mgr := Manager{
client: client,
managedJobs: map[types.UID]managed_job.ManagedJob{},
processingJobsIds: []types.UID{},
}
mgr.cycle(client, crdInstance)
go func() {
for {
select {
case <-ticker.C:
mgr.cycle(client, crdInstance)
case <-quit:
ticker.Stop()
return
}
}
}()
return mgr
}
func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
jobManagerJobOverviews, jobManagerJobStatusError := mgr.client.JobsOverview()
if jobManagerJobStatusError != nil {
pkg.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError))
crdInstance.PatchAll(map[string]interface{}{
"status": map[string]interface{}{
"lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager,
},
})
}
//pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews))
// Loop over job definitions as Kubernetes CRD
for _, uid := range crd.GetAllJobKeys() {
if lo.Contains(mgr.processingJobsIds, uid) {
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
continue
}
// Get job definition from Kubernetes CRD
def := crd.GetJob(uid)
mgr.processingJobsIds = append(mgr.processingJobsIds, uid)
// Check if job exists in manager managed jobs
managedJob, ok := mgr.managedJobs[uid]
if ok {
managedJob.Update(def)
} else {
// Add job to manager managed job
managedJob = *managed_job.NewManagedJob(client, def, crdInstance)
}
if jobManagerJobStatusError != nil {
}
jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
jobId := managedJob.GetJobId()
if jobId != nil {
return job.ID == *jobId
}
return false
})
if ok {
pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
patchStatusObj := map[string]interface{}{
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
}
if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) {
status := string(v1alpha1.LifeCycleStatusHealthy)
patchStatusObj["lifeCycleStatus"] = &status
}
crdInstance.Patch(uid, map[string]interface{}{
"status": patchStatusObj,
})
}
managedJob.Cycle()
mgr.managedJobs[uid] = managedJob
mgr.processingJobsIds = lo.Filter(mgr.processingJobsIds, func(current types.UID, i int) bool {
return current != uid
})
}
}