feat(crd): add update custom resource status

This commit is contained in:
Mohamad Khani 2024-12-01 23:55:58 +03:30
parent fb646086b0
commit 3e77ac121a
8 changed files with 100 additions and 13 deletions

View File

@ -21,7 +21,7 @@ func main() {
config := lc.LoadYamlConfig[config.Config]("./config.yaml")
// init kubernetes flink job crd watch
crd.New()
crdInstance := crd.New()
// create database instance
db, err := buntdb.Open(config.DatabasePath)
@ -44,7 +44,7 @@ func main() {
fmt.Println(clusterConfig)
// init flink job manager
manager.Setup(c, db)
manager.Setup(c, db, crdInstance)
// for _, jobDef := range config.Jobs {
// managed_job.NewManagedJob(c, db, jobDef)

33
internal/crd/convert.go Normal file
View File

@ -0,0 +1,33 @@
package crd
import (
"flink-kube-operator/internal/crd/v1alpha1"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
func convertToUnstructured(obj v1alpha1.FlinkJob) (*unstructured.Unstructured, error) {
// Convert the structured object to an unstructured map
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, fmt.Errorf("failed to convert to unstructured: %v", err)
}
// Create an Unstructured object from the map
unstructuredObj := &unstructured.Unstructured{
Object: unstructuredMap,
}
return unstructuredObj, nil
}
func convertFromUnstructured(in *unstructured.Unstructured) (*v1alpha1.FlinkJob, error) {
job := &v1alpha1.FlinkJob{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(in.Object, job)
if err != nil {
return nil, nil
}
return job, nil
}

View File

@ -2,11 +2,53 @@ package crd
import (
"context"
"encoding/json"
"flink-kube-operator/internal/crd/v1alpha1"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"gitea.com/logicamp/lc"
"go.uber.org/zap"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func (crd Crd) AddEvent(jobUid types.UID, event string) {
crd.client.UpdateStatus(context.Background(), nil, v1.UpdateOptions{})
func (crd Crd) SetJobStatus(jobUid types.UID, status string) error {
job := GetJob(jobUid)
// Define the patch data (JSON Merge Patch format)
patchData := map[string]interface{}{
"status": v1alpha1.FlinkJobStatus{},
}
patchBytes, err := json.Marshal(patchData)
if err != nil {
return fmt.Errorf("error marshaling patch data: %w", err)
}
// Patch the status subresource
unstructuredJob, err := crd.client.
Namespace(job.GetNamespace()).
Patch(
context.Background(),
job.GetName(),
types.MergePatchType, // Use MergePatchType for JSON Merge Patch
patchBytes,
metaV1.PatchOptions{},
)
if err != nil {
lc.Logger.Error(
"[crd] [status] error patching custom resource status",
zap.String("namespace", job.GetNamespace()),
zap.Error(err),
)
return err
}
patched, err := convertFromUnstructured(unstructuredJob)
if err != nil {
lc.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err))
}
lc.Logger.Debug("[crd] [status] set status", zap.Any("statusUpdateObj", patched))
if err != nil {
lc.Logger.Error("[crd] [status] ", zap.Error(err))
return err
}
return nil
}

View File

@ -12,7 +12,7 @@ type Crd struct {
client dynamic.NamespaceableResourceInterface
}
func New() {
func New() *Crd {
// Get Kubernetes config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
@ -37,4 +37,5 @@ func New() {
// Watch for FlinkJob creation
go crd.watchFlinkJobs()
return &crd
}

View File

@ -15,11 +15,17 @@ type FlinkJobSpec struct {
EntryClass string `json:"entryClass"`
}
type FlinkJobStatus struct {
JobStatus *string `json:"jobStatus,omitempty"`
LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type FlinkJob struct {
metaV1.TypeMeta `json:",inline"`
metaV1.ObjectMeta `json:"metadata,omitempty"`
Spec FlinkJobSpec `json:"spec"`
Status FlinkJobStatus `json:"status"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -44,6 +44,8 @@ func (job *ManagedJob) cycle() {
}
return
}
job.crd.SetJobStatus(job.def.UID, string(job.state.Status))
// Check for set running or error state
if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing {
err := job.checkStatus()
@ -72,6 +74,6 @@ func (job *ManagedJob) cycle() {
job.restore()
return
}
lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status)))
lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status)))
}

View File

@ -1,6 +1,7 @@
package managed_job
import (
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
api "github.com/logi-camp/go-flink-client"
@ -13,13 +14,15 @@ type ManagedJob struct {
jarId string
db *buntdb.DB
state *jobState
crd *crd.Crd
}
func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob) *ManagedJob {
func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob, crd *crd.Crd) *ManagedJob {
job := &ManagedJob{
def: def,
client: client,
db: db,
crd: crd,
}
job.startCycle()
return job

View File

@ -12,7 +12,7 @@ import (
var managedJobs = map[types.UID]managed_job.ManagedJob{}
func Setup(client *api.Client, db *buntdb.DB) {
func Setup(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) {
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
@ -20,7 +20,7 @@ func Setup(client *api.Client, db *buntdb.DB) {
for {
select {
case <-ticker.C:
cycle(client, db)
cycle(client, db, crdInstance)
case <-quit:
ticker.Stop()
return
@ -29,14 +29,14 @@ func Setup(client *api.Client, db *buntdb.DB) {
}()
}
func cycle(client *api.Client, db *buntdb.DB) {
func cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) {
for _, uid := range crd.GetAllJobKeys() {
def := crd.GetJob(uid)
managedJob, ok := managedJobs[uid]
if ok {
managedJob.Update(def)
} else {
managedJob := managed_job.NewManagedJob(client, db, def)
managedJob := managed_job.NewManagedJob(client, db, def, crdInstance)
managedJobs[uid] = *managedJob
}
}