feat(crd): update crd type

This commit is contained in:
Mohamad Khani 2024-11-30 20:52:12 +03:30
parent b0ff04126a
commit 412a5292cb
9 changed files with 43 additions and 32 deletions

View File

@ -1,5 +1,6 @@
{ {
"cSpell.words": [ "cSpell.words": [
"apiextensions",
"deepcopy", "deepcopy",
"flink" "flink"
] ]

View File

@ -13,7 +13,7 @@ spec:
- lfj - lfj
scope: Namespaced scope: Namespaced
versions: versions:
- name: v1beta1 - name: v1alpha1
served: true served: true
storage: true storage: true
schema: schema:
@ -22,15 +22,23 @@ spec:
properties: properties:
spec: spec:
type: object type: object
required:
- key
- jarUri
properties: properties:
jobName: key:
type: string type: string
jobClass: name:
type: string
entryClass:
type: string type: string
parallelism: parallelism:
type: integer type: integer
jarUri: jarUri:
type: string type: string
savepointInterval:
type: string
format: duration
flinkConfiguration: flinkConfiguration:
type: object type: object
additionalProperties: additionalProperties:

View File

@ -1,21 +1,15 @@
# flink-job-instance.yaml # flink-job-instance.yaml
apiVersion: flink.logicamp.tech/v1beta1 apiVersion: flink.logicamp.tech/v1alpha1
kind: FlinkJob kind: FlinkJob
metadata: metadata:
name: my-flink-job name: my-flink-job
namespace: default namespace: default
spec: spec:
jobName: "Word Count Example" key: word-count
jobClass: "org.apache.flink.examples.java.wordcount.WordCount" name: "Word Count Example"
entryClass: "org.apache.flink.examples.java.wordcount.WordCount"
parallelism: 2 parallelism: 2
jarUri: "local:///opt/flink/examples/wordcount.jar" jarUri: "local:///opt/flink/examples/wordcount.jar"
flinkConfiguration: flinkConfiguration:
taskmanager.numberOfTaskSlots: "2" taskmanager.numberOfTaskSlots: "2"
parallelism.default: "2" parallelism.default: "2"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"

View File

@ -1,12 +0,0 @@
package crd
import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
// Define the FlinkJob resource GVR (Group, Version, Resource)
var flinkJobGVR = schema.GroupVersionResource{
Group: "flink.logicamp.tech",
Version: "v1beta1",
Resource: "flink-jobs",
}

View File

@ -1,6 +1,8 @@
package crd package crd
import ( import (
"flink-kube-operator/internal/crd/v1alpha1"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
@ -27,7 +29,7 @@ func New() {
} }
// Get FlinkJob resource interface // Get FlinkJob resource interface
flinkJobClient := dynamicClient.Resource(flinkJobGVR) flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR)
crd := Crd{ crd := Crd{
client: flinkJobClient, client: flinkJobClient,

View File

@ -7,16 +7,19 @@ import (
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct { type FlinkJobSpec struct {
Name string `json:"name"` Key string `json:"key"`
Parallelism int `json:"parallelism"` Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
} }
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type FlinkJob struct { type FlinkJob struct {
metaV1.TypeMeta `json:",inline"` metaV1.TypeMeta `json:",inline"`
metaV1.ObjectMeta `json:"metadata,omitempty"` metaV1.ObjectMeta `json:"metadata,omitempty"`
Spec FlinkJobSpec `json:"spec"`
Spec FlinkJobSpec `json:"spec"`
} }
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -8,9 +8,17 @@ import (
const GroupName = "flink.logicamp.tech" const GroupName = "flink.logicamp.tech"
const GroupVersion = "v1alpha1" const GroupVersion = "v1alpha1"
const ResourceName = "flink-jobs"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion} var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}
// Define the FlinkJob resource GVR (Group, Version, Resource)
var FlinkJobGVR = schema.GroupVersionResource{
Group: GroupName,
Version: GroupVersion,
Resource: ResourceName,
}
var ( var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme AddToScheme = SchemeBuilder.AddToScheme

View File

@ -45,6 +45,8 @@ func (crd Crd) watchFlinkJobs() {
fmt.Printf("New FlinkJob created: %s\n", job.GetName()) fmt.Printf("New FlinkJob created: %s\n", job.GetName())
// Handle the new FlinkJob // Handle the new FlinkJob
handleNewFlinkJob(job) handleNewFlinkJob(job)
case watch.Deleted:
} }
} }
} }
@ -57,6 +59,6 @@ func handleNewFlinkJob(job *v1alpha1.FlinkJob) {
// Process job specification // Process job specification
fmt.Printf("Processing FlinkJob %s in namespace %s kind: %s \n", name, namespace, job.Kind) fmt.Printf("Processing FlinkJob %s in namespace %s kind: %s \n", name, namespace, job.Kind)
lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job)) lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job), zap.Any("name", job.Spec.Name))
// Add your custom logic here // Add your custom logic here
} }

View File

@ -0,0 +1,5 @@
package managed_job
func (job *ManagedJob) Stop() {
job.client.StopJob(*job.state.JobId)
}