From 3e77ac121a78bdb00b8f9723a09e2f24109bcf6f Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sun, 1 Dec 2024 23:55:58 +0330 Subject: [PATCH] feat(crd): add update custom resource status --- cmd/operator/main.go | 4 +-- internal/crd/convert.go | 33 ++++++++++++++++++++ internal/crd/event.go | 48 ++++++++++++++++++++++++++++-- internal/crd/new.go | 3 +- internal/crd/v1alpha1/flink_job.go | 8 ++++- internal/managed_job/cycle.go | 4 ++- internal/managed_job/new.go | 5 +++- internal/manager/manager.go | 8 ++--- 8 files changed, 100 insertions(+), 13 deletions(-) create mode 100644 internal/crd/convert.go diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 43bb00d..0618c71 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -21,7 +21,7 @@ func main() { config := lc.LoadYamlConfig[config.Config]("./config.yaml") // init kubernetes flink job crd watch - crd.New() + crdInstance := crd.New() // create database instance db, err := buntdb.Open(config.DatabasePath) @@ -44,7 +44,7 @@ func main() { fmt.Println(clusterConfig) // init flink job manager - manager.Setup(c, db) + manager.Setup(c, db, crdInstance) // for _, jobDef := range config.Jobs { // managed_job.NewManagedJob(c, db, jobDef) diff --git a/internal/crd/convert.go b/internal/crd/convert.go new file mode 100644 index 0000000..eb2beff --- /dev/null +++ b/internal/crd/convert.go @@ -0,0 +1,33 @@ +package crd + +import ( + "flink-kube-operator/internal/crd/v1alpha1" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func convertToUnstructured(obj v1alpha1.FlinkJob) (*unstructured.Unstructured, error) { + // Convert the structured object to an unstructured map + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, fmt.Errorf("failed to convert to unstructured: %v", err) + } + + // Create an Unstructured object from the map + unstructuredObj := &unstructured.Unstructured{ + Object: unstructuredMap, + } + + return unstructuredObj, nil +} + +func convertFromUnstructured(in *unstructured.Unstructured) (*v1alpha1.FlinkJob, error) { + job := &v1alpha1.FlinkJob{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(in.Object, job) + if err != nil { + return nil, nil + } + return job, nil +} diff --git a/internal/crd/event.go b/internal/crd/event.go index 8156728..b5c42bf 100644 --- a/internal/crd/event.go +++ b/internal/crd/event.go @@ -2,11 +2,53 @@ package crd import ( "context" + "encoding/json" + "flink-kube-operator/internal/crd/v1alpha1" + "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "gitea.com/logicamp/lc" + "go.uber.org/zap" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) -func (crd Crd) AddEvent(jobUid types.UID, event string) { - crd.client.UpdateStatus(context.Background(), nil, v1.UpdateOptions{}) +func (crd Crd) SetJobStatus(jobUid types.UID, status string) error { + job := GetJob(jobUid) + // Define the patch data (JSON Merge Patch format) + patchData := map[string]interface{}{ + "status": v1alpha1.FlinkJobStatus{}, + } + patchBytes, err := json.Marshal(patchData) + if err != nil { + return fmt.Errorf("error marshaling patch data: %w", err) + } + + // Patch the status subresource + unstructuredJob, err := crd.client. + Namespace(job.GetNamespace()). + Patch( + context.Background(), + job.GetName(), + types.MergePatchType, // Use MergePatchType for JSON Merge Patch + patchBytes, + metaV1.PatchOptions{}, + ) + if err != nil { + lc.Logger.Error( + "[crd] [status] error patching custom resource status", + zap.String("namespace", job.GetNamespace()), + zap.Error(err), + ) + return err + } + patched, err := convertFromUnstructured(unstructuredJob) + if err != nil { + lc.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err)) + } + lc.Logger.Debug("[crd] [status] set status", zap.Any("statusUpdateObj", patched)) + if err != nil { + lc.Logger.Error("[crd] [status] ", zap.Error(err)) + return err + } + return nil } diff --git a/internal/crd/new.go b/internal/crd/new.go index bbcc7f2..caa6405 100644 --- a/internal/crd/new.go +++ b/internal/crd/new.go @@ -12,7 +12,7 @@ type Crd struct { client dynamic.NamespaceableResourceInterface } -func New() { +func New() *Crd { // Get Kubernetes config config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { @@ -37,4 +37,5 @@ func New() { // Watch for FlinkJob creation go crd.watchFlinkJobs() + return &crd } diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index e65b403..927ab38 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -15,11 +15,17 @@ type FlinkJobSpec struct { EntryClass string `json:"entryClass"` } +type FlinkJobStatus struct { + JobStatus *string `json:"jobStatus,omitempty"` + LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"` +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type FlinkJob struct { metaV1.TypeMeta `json:",inline"` metaV1.ObjectMeta `json:"metadata,omitempty"` - Spec FlinkJobSpec `json:"spec"` + Spec FlinkJobSpec `json:"spec"` + Status FlinkJobStatus `json:"status"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index c823679..9cdbab9 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -44,6 +44,8 @@ func (job *ManagedJob) cycle() { } return } + job.crd.SetJobStatus(job.def.UID, string(job.state.Status)) + // Check for set running or error state if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing { err := job.checkStatus() @@ -72,6 +74,6 @@ func (job *ManagedJob) cycle() { job.restore() return } - lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status))) + lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status))) } diff --git a/internal/managed_job/new.go b/internal/managed_job/new.go index 730c1a0..3a2a4bd 100644 --- a/internal/managed_job/new.go +++ b/internal/managed_job/new.go @@ -1,6 +1,7 @@ package managed_job import ( + "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" api "github.com/logi-camp/go-flink-client" @@ -13,13 +14,15 @@ type ManagedJob struct { jarId string db *buntdb.DB state *jobState + crd *crd.Crd } -func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob) *ManagedJob { +func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob, crd *crd.Crd) *ManagedJob { job := &ManagedJob{ def: def, client: client, db: db, + crd: crd, } job.startCycle() return job diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 5fa783f..426b8d4 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -12,7 +12,7 @@ import ( var managedJobs = map[types.UID]managed_job.ManagedJob{} -func Setup(client *api.Client, db *buntdb.DB) { +func Setup(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) { ticker := time.NewTicker(5 * time.Second) quit := make(chan struct{}) @@ -20,7 +20,7 @@ func Setup(client *api.Client, db *buntdb.DB) { for { select { case <-ticker.C: - cycle(client, db) + cycle(client, db, crdInstance) case <-quit: ticker.Stop() return @@ -29,14 +29,14 @@ func Setup(client *api.Client, db *buntdb.DB) { }() } -func cycle(client *api.Client, db *buntdb.DB) { +func cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) { for _, uid := range crd.GetAllJobKeys() { def := crd.GetJob(uid) managedJob, ok := managedJobs[uid] if ok { managedJob.Update(def) } else { - managedJob := managed_job.NewManagedJob(client, db, def) + managedJob := managed_job.NewManagedJob(client, db, def, crdInstance) managedJobs[uid] = *managedJob } }