From 412a5292cb1395bc8afc450df9e0e4a849f568f5 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sat, 30 Nov 2024 20:52:12 +0330 Subject: [PATCH] feat(crd): update crd type --- .vscode/settings.json | 1 + crds.yaml | 14 +++++++++++--- example-job.yaml | 16 +++++----------- internal/crd/crd.type.go | 12 ------------ internal/crd/new.go | 4 +++- internal/crd/v1alpha1/flink_job.go | 11 +++++++---- internal/crd/v1alpha1/register.go | 8 ++++++++ internal/crd/watch.go | 4 +++- internal/managed_job/remove.go | 5 +++++ 9 files changed, 43 insertions(+), 32 deletions(-) delete mode 100644 internal/crd/crd.type.go create mode 100644 internal/managed_job/remove.go diff --git a/.vscode/settings.json b/.vscode/settings.json index fa2ce73..56c0029 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,6 @@ { "cSpell.words": [ + "apiextensions", "deepcopy", "flink" ] diff --git a/crds.yaml b/crds.yaml index 0e41a08..1574e3c 100644 --- a/crds.yaml +++ b/crds.yaml @@ -13,7 +13,7 @@ spec: - lfj scope: Namespaced versions: - - name: v1beta1 + - name: v1alpha1 served: true storage: true schema: @@ -22,15 +22,23 @@ spec: properties: spec: type: object + required: + - key + - jarUri properties: - jobName: + key: type: string - jobClass: + name: + type: string + entryClass: type: string parallelism: type: integer jarUri: type: string + savepointInterval: + type: string + format: duration flinkConfiguration: type: object additionalProperties: diff --git a/example-job.yaml b/example-job.yaml index e853453..81a90e2 100644 --- a/example-job.yaml +++ b/example-job.yaml @@ -1,21 +1,15 @@ # flink-job-instance.yaml -apiVersion: flink.logicamp.tech/v1beta1 +apiVersion: flink.logicamp.tech/v1alpha1 kind: FlinkJob metadata: name: my-flink-job namespace: default spec: - jobName: "Word Count Example" - jobClass: "org.apache.flink.examples.java.wordcount.WordCount" + key: word-count + name: "Word Count Example" + entryClass: "org.apache.flink.examples.java.wordcount.WordCount" parallelism: 2 jarUri: "local:///opt/flink/examples/wordcount.jar" flinkConfiguration: taskmanager.numberOfTaskSlots: "2" - parallelism.default: "2" - resources: - requests: - memory: "2Gi" - cpu: "1" - limits: - memory: "4Gi" - cpu: "2" \ No newline at end of file + parallelism.default: "2" \ No newline at end of file diff --git a/internal/crd/crd.type.go b/internal/crd/crd.type.go deleted file mode 100644 index dde50c8..0000000 --- a/internal/crd/crd.type.go +++ /dev/null @@ -1,12 +0,0 @@ -package crd - -import ( - "k8s.io/apimachinery/pkg/runtime/schema" -) - -// Define the FlinkJob resource GVR (Group, Version, Resource) -var flinkJobGVR = schema.GroupVersionResource{ - Group: "flink.logicamp.tech", - Version: "v1beta1", - Resource: "flink-jobs", -} diff --git a/internal/crd/new.go b/internal/crd/new.go index c4fe3bb..bbcc7f2 100644 --- a/internal/crd/new.go +++ b/internal/crd/new.go @@ -1,6 +1,8 @@ package crd import ( + "flink-kube-operator/internal/crd/v1alpha1" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -27,7 +29,7 @@ func New() { } // Get FlinkJob resource interface - flinkJobClient := dynamicClient.Resource(flinkJobGVR) + flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR) crd := Crd{ client: flinkJobClient, diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 5ef872d..e65b403 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -7,16 +7,19 @@ import ( //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE type FlinkJobSpec struct { - Name string `json:"name"` - Parallelism int `json:"parallelism"` + Key string `json:"key"` + Name string `json:"name"` + Parallelism int `json:"parallelism"` + JarURI string `json:"jarUri"` + SavepointInterval metaV1.Duration `json:"savepointInterval"` + EntryClass string `json:"entryClass"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type FlinkJob struct { metaV1.TypeMeta `json:",inline"` metaV1.ObjectMeta `json:"metadata,omitempty"` - - Spec FlinkJobSpec `json:"spec"` + Spec FlinkJobSpec `json:"spec"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/internal/crd/v1alpha1/register.go b/internal/crd/v1alpha1/register.go index 16df89d..68de54d 100644 --- a/internal/crd/v1alpha1/register.go +++ b/internal/crd/v1alpha1/register.go @@ -8,9 +8,17 @@ import ( const GroupName = "flink.logicamp.tech" const GroupVersion = "v1alpha1" +const ResourceName = "flink-jobs" var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion} +// Define the FlinkJob resource GVR (Group, Version, Resource) +var FlinkJobGVR = schema.GroupVersionResource{ + Group: GroupName, + Version: GroupVersion, + Resource: ResourceName, +} + var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme diff --git a/internal/crd/watch.go b/internal/crd/watch.go index 050f54d..5a19b4f 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -45,6 +45,8 @@ func (crd Crd) watchFlinkJobs() { fmt.Printf("New FlinkJob created: %s\n", job.GetName()) // Handle the new FlinkJob handleNewFlinkJob(job) + case watch.Deleted: + } } } @@ -57,6 +59,6 @@ func handleNewFlinkJob(job *v1alpha1.FlinkJob) { // Process job specification fmt.Printf("Processing FlinkJob %s in namespace %s kind: %s \n", name, namespace, job.Kind) - lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job)) + lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job), zap.Any("name", job.Spec.Name)) // Add your custom logic here } diff --git a/internal/managed_job/remove.go b/internal/managed_job/remove.go new file mode 100644 index 0000000..64ad4bc --- /dev/null +++ b/internal/managed_job/remove.go @@ -0,0 +1,5 @@ +package managed_job + +func (job *ManagedJob) Stop() { + job.client.StopJob(*job.state.JobId) +}