feat: add restore statuses to kubernetes crd

This commit is contained in:
Mohamad Khani 2024-12-08 00:47:53 +03:30
parent c5b19d3336
commit 5abc044d69
9 changed files with 101 additions and 39 deletions

View File

@ -69,6 +69,8 @@ spec:
type: string
jobId:
type: string
jarId:
type: string
error:
type: string
lastSavepointPath:
@ -80,6 +82,14 @@ spec:
lastSavepointDate:
type: string
format: time
lastRestoredSavepointDate:
type: string
format: time
lastRestoredSavepointRestoredDate:
type: string
format: time
restoredCount:
type: number
additionalPrinterColumns:
- name: Status
type: string
@ -90,3 +100,12 @@ spec:
- name: Life Cycle Status
type: string
jsonPath: .status.lifeCycleStatus
- name: Last Savepoint
type: date
jsonPath: .status.lastSavepointDate
- name: Last Restored Savepoint
type: date
jsonPath: .status.lastRestoredSavepointDate
- name: Restored Count
type: number
jsonPath: .status.restoredCount

View File

@ -19,7 +19,7 @@ func (crd Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
return fmt.Errorf("error marshaling patch data: %w", err)
}
// Patch the status subresource
// Patch the status sub-resource
unstructuredJob, err := crd.client.
Namespace(job.GetNamespace()).
Patch(
@ -49,3 +49,14 @@ func (crd Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
}
return nil
}
func (crd Crd) PatchAll(patchData map[string]interface{}) error {
keys := GetAllJobKeys()
for _, key := range keys {
err := crd.Patch(key, patchData)
if err != nil {
return err
}
}
return nil
}

View File

@ -22,10 +22,14 @@ type FlinkJobStatus struct {
JobStatus JobStatus `json:"jobStatus,omitempty"`
LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"`
LastSavepointPath *string `json:"lastSavepointPath,omitempty"`
JarId *string `json:"jarId,omitempty"`
JobId *string `json:"jobId,omitempty"`
Error *string `json:"error,omitempty"`
SavepointTriggerId *string `json:"savepointTriggerId,omitempty"`
LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"`
LastRestoredSavepointDate *time.Time `json:"lastRestoredSavepointDate,omitempty"`
LastRestoredSavepointRestoredDate *time.Time `json:"lastRestoredSavepointRestoredDate,omitempty"`
RestoredCount int `json:"restoredCount,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -71,6 +75,7 @@ type LifeCycleStatus string
const (
LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING"
LifeCycleStatusRestoring LifeCycleStatus = "RESTORING"
LifeCycleStatusUnhealthyJobManager LifeCycleStatus = "UNHEALTHY_JOB_MANAGER"
LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY"
LifeCycleStatusFailed LifeCycleStatus = "FAILED"
)

View File

@ -10,7 +10,6 @@ import (
type ManagedJob struct {
def v1alpha1.FlinkJob
client *api.Client
jarId string
crd *crd.Crd
}

View File

@ -1,7 +1,9 @@
package managed_job
import (
"errors"
"flink-kube-operator/internal/crd/v1alpha1"
"time"
"gitea.com/logicamp/lc"
api "github.com/logi-camp/go-flink-client"
@ -14,9 +16,14 @@ func (job *ManagedJob) restore() error {
lc.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
return v1alpha1.ErrNoSavepointPath
}
if job.def.Status.JarId == nil {
err := errors.New("missing jar id")
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
return err
}
lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.def.Status.LastSavepointPath))
runJarResp, err := job.client.RunJar(api.RunOpts{
JarID: job.jarId,
JarID: *job.def.Status.JarId,
AllowNonRestoredState: true,
EntryClass: job.def.Spec.EntryClass,
SavepointPath: *job.def.Status.LastSavepointPath,
@ -35,6 +42,9 @@ func (job *ManagedJob) restore() error {
"jobId": &runJarResp.JobId,
"jobStatus": v1alpha1.JobStatusCreating,
"lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring,
"lastRestoredSavepointDate": job.def.Status.LastRestoredSavepointDate,
"restoredCount": job.def.Status.RestoredCount + 1,
"lastRestoredSavepointRestoreDate": time.Now().Format(time.RFC3339),
"error": nil,
},
})

View File

@ -1,6 +1,7 @@
package managed_job
import (
"errors"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/jar"
@ -24,14 +25,24 @@ func (job *ManagedJob) upload() error {
}
lc.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", fileName))
job.jarId = fileName
job.def.Status.JarId = &fileName
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jarId": job.def.Status.JarId,
},
})
return nil
}
// run the job from saved jarId in managedJob
func (job *ManagedJob) run() error {
if job.def.Status.JarId == nil {
err := errors.New("missing jar id")
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
return err
}
runJarResp, err := job.client.RunJar(api.RunOpts{
JarID: job.jarId,
JarID: *job.def.Status.JarId,
AllowNonRestoredState: true,
EntryClass: job.def.Spec.EntryClass,
})

View File

@ -55,7 +55,7 @@ func (job ManagedJob) trackSavepoint() error {
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"lastSavepointPath": resp.Operation.Location,
"lastSavepointDate": time.Now(),
"lastSavepointDate": time.Now().Format(time.RFC3339),
},
})
}

View File

@ -1,8 +0,0 @@
package manager
import api "github.com/logi-camp/go-flink-client"
func (mgr *Manager) checkJobStatus(client *api.Client) error {
return nil
}

View File

@ -41,22 +41,36 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
}
func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
jobsOverviews, err := mgr.client.JobsOverview()
if err != nil {
lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(err))
jobManagerJobOverviews, jobManagerJobStatusError := mgr.client.JobsOverview()
if jobManagerJobStatusError != nil {
lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(jobManagerJobStatusError))
crdInstance.PatchAll(map[string]interface{}{
"status": map[string]interface{}{
"jobStatus": "",
"lifeCycleStatus": v1alpha1.LifeCycleStatusUnhealthyJobManager,
},
})
}
//lc.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews))
// Loop over job definitions as Kubernetes CRD
for _, uid := range crd.GetAllJobKeys() {
// Get job definition from Kubernetes CRD
def := crd.GetJob(uid)
// Check if job exists in manager managed jobs
managedJob, ok := mgr.managedJobs[uid]
if ok {
managedJob.Update(def)
} else {
// Add job to manager managed job
managedJob = *managed_job.NewManagedJob(client, def, crdInstance)
//mgr.managedJobs[uid] = managedJob
}
jobOverview, ok := lo.Find(jobsOverviews.Jobs, func(job api.JobOverview) bool {
if jobManagerJobStatusError != nil {
}
jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
jobId := managedJob.GetJobId()
if jobId != nil {
return job.ID == *jobId
@ -64,20 +78,21 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
return false
})
if ok {
lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobOverview.Name), zap.String("state", jobOverview.State))
lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
var jobLifeCycleStatus *string
if jobOverview.State == string(v1alpha1.JobStatusRunning) {
if jobManagerJobOverview.State == string(v1alpha1.JobStatusRunning) {
status := string(v1alpha1.LifeCycleStatusHealthy)
jobLifeCycleStatus = &status
}
crdInstance.Patch(uid, map[string]interface{}{
"status": map[string]interface{}{
"jobStatus": v1alpha1.JobStatus(jobOverview.State),
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
"lifeCycleStatus": jobLifeCycleStatus,
},
})
}
managedJob.Cycle()
mgr.managedJobs[uid] = managedJob
}