Compare commits

...

15 Commits

Author SHA1 Message Date
5ca1c28b33 fix(helm): wrong savepoint path config when storage-type is filesystem 2025-07-18 18:12:21 +03:30
d73292ac54 fix: resolve missing task manager statefulset savepoint pvc mount 2025-05-17 14:35:22 +03:30
f0df5ff937 fix: wrong fieldPath in task-manager statefulset spec.hostname 2025-05-17 14:02:01 +03:30
83c4b5ded2 feat(helm): add filesystem savepoint storage mode 2025-05-17 13:02:24 +03:30
89647f3b5b fix(helm): add flink taskmanager host env to task manager 2025-04-15 12:08:17 +03:30
dedbe00fba fix(helm): wrong checkpoint path flink properties 2025-04-13 10:38:15 +03:30
62c340bc64 feat(helm): add filesystem checkpoint storage mode 2025-04-13 10:00:32 +03:30
44ff3627fc feat(helm): add flink properties variable to values 2025-04-12 23:14:52 +03:30
392004d99a ci(docker): add zstd dependency jar to flink docker file 2025-04-12 23:07:53 +03:30
22c7d712f4 feat: update flink http client library 2025-04-07 13:20:39 +03:30
2dd625ec7c feat: update flink http client library 2025-04-07 11:28:33 +03:30
c991215a9d Merge branch 'main' of https://git.logicamp.tech/Logicamp/flink-kube-operator 2025-04-06 08:48:54 +03:30
1c32bfbbe0 chore: create index and chart package 2025-04-06 01:53:33 +03:30
f210090dff Merge branch 'feature/new-helm-structure' into HEAD 2025-04-06 01:49:21 +03:30
556d9ff6af fix: wong update status in some situations 2025-03-05 11:40:22 +03:30
25 changed files with 214 additions and 44 deletions

View File

@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build
FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
ARG upx_version=4.2.4

View File

@ -27,6 +27,7 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"]

2
go.sum
View File

@ -64,6 +64,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=

View File

@ -2,9 +2,5 @@ apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 0.1.14
appVersion: "0.1.0"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
version: 1.2.3
appVersion: "0.1.1"

Binary file not shown.

View File

@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}
{{- end }}

View File

@ -18,14 +18,26 @@
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
state.savepoints.dir: file:///opt/flink/savepoints/
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
state.savepoints.dir: s3://flink/savepoints/
{{- end }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- end }}
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }}

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
name: {{ .Release.Name }}-flink-ha-pvc
spec:
accessModes:
- ReadWriteOnce

View File

@ -49,6 +49,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
@ -61,15 +62,32 @@ spec:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
{{- end }}
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoints
{{- end }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-savepoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.savepoint.size }}
{{- end }}

View File

@ -31,6 +31,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
@ -43,11 +44,32 @@ spec:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
{{- end }}
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoints
{{- end }}
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
volumes:
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage

View File

@ -46,10 +46,11 @@ spec:
env:
- name: FLINK_API_URL
value: {{ .Release.Name }}-flink-job-manager:8081
- name: SAVEPOINT_PATH
value: s3://flink/savepoints/
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
{{- if eq .Values.flink.state.savepoint.storageType "s3" }}
- name: SAVEPOINT_PATH
value: s3://flink/savepoints/
- name: S3_ENDPOINT
value: "http://{{ .Release.Name }}-minio:9000"
- name: AWS_ACCESS_KEY_ID
@ -62,5 +63,8 @@ spec:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-password
{{- else }}
- name: SAVEPOINT_PATH
value: /opt/flink/savepoints/
{{- end }}

View File

@ -38,8 +38,7 @@ podAnnotations: {}
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {}
podSecurityContext: {}
# fsGroup: 2000
podSecurityContext: {} # fsGroup: 2000
securityContext: {}
# capabilities:
@ -64,10 +63,10 @@ ingress:
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
@ -106,7 +105,6 @@ autoscaling:
config:
flinkApiUrl: flink:8081
nodeSelector: {}
tolerations: []
@ -120,35 +118,41 @@ flink:
tag: 1.20.1-scala_2.12-java17-minicluster
parallelism:
default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
default: 1 # Default parallelism for Flink jobs
state:
backend: rocksdb # Use RocksDB for state backend
backend: rocksdb # Use RocksDB for state backend
incremental: true
ha:
dir: "/opt/flink/ha" # Directory to store ha data
dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
savepoint:
storageType: s3
size: 8Gi
jobManager:
processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager:
numberOfTaskSlots: 12 # Number of task slots for TaskManager
numberOfTaskSlots: 12 # Number of task slots for task manager
processMemory: 4096m # Size of task manager process memory
replicas: 1
replicas: 1 # Number of task manager replicas
storage:
rocksDb:
size: 4Gi
resources:
resources:
limits:
cpu: 3
memory: 4Gi
requests:
cpu: 1
memory: 2Gi
memory: 2Gi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,6 +1,92 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-07-18T18:09:46.27166563+03:30"
description: Helm chart for flink kube operator
digest: 597f2c07884bb5411dcc6e1a9cdf7672977858efe30273a46fb6525eb6013091
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.3.tgz
version: 1.2.3
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:34:55.317942453+03:30"
description: Helm chart for flink kube operator
digest: 422a34dc173ebe29adccd46d7ef94505cc022ff20ccbfb85ac3e6e201cba476c
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.2.tgz
version: 1.2.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:01:29.891695937+03:30"
description: Helm chart for flink kube operator
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.1.tgz
version: 1.2.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T12:47:25.848097207+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.0.tgz
version: 1.2.0
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.0.tgz
version: 1.0.0
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.971040367+03:30"
@ -151,4 +237,4 @@ entries:
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2025-04-04T13:50:27.967565847+03:30"
generated: "2025-07-18T18:09:46.244672127+03:30"

View File

@ -13,7 +13,7 @@ func (job *ManagedJob) Cycle() {
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
// Init job
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" {
if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
job.Run(false)
return
}

View File

@ -117,14 +117,15 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
"status": patchStatusObj,
})
} else {
patchStatusObj := map[string]interface{}{
"jobStatus": "",
"lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
}
// TODO handle job not found status
// patchStatusObj := map[string]interface{}{
// "jobStatus": "",
// "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
// }
crdInstance.Patch(uid, map[string]interface{}{
"status": patchStatusObj,
})
// crdInstance.Patch(uid, map[string]interface{}{
// "status": patchStatusObj,
// })
}
managedJob.Cycle()