Compare commits

..

4 Commits

Author SHA1 Message Date
1c32bfbbe0 chore: create index and chart package 2025-04-06 01:53:33 +03:30
f210090dff Merge branch 'feature/new-helm-structure' into HEAD 2025-04-06 01:49:21 +03:30
54008669cb fix(helm): wrong savepoint and checkpoint s3 configs 2025-04-06 01:49:00 +03:30
830e265162 feat: apply new helm structure
use minio s3 for savepoint and checkpoint path
separate task-manager, job-manager and operator
use statefulset for task-manager to handle replication
support basic credential for download jar request
update to flink 1.20.1
2025-04-05 01:39:02 +03:30
27 changed files with 401 additions and 259 deletions

View File

@ -14,6 +14,7 @@
"nindent",
"reactivex",
"repsert",
"taskmanager",
"tolerations"
]
}

View File

@ -2,7 +2,7 @@ FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build
ARG upx_version=4.2.4
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \
curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \
cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \
chmod +x /usr/local/bin/upx && \
@ -27,6 +27,7 @@ FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
COPY --from=build /flink-kube-operator /flink-kube-operator
COPY --from=build /etc/ssl/certs /etc/ssl/certs
EXPOSE 8083

View File

@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17
FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17
# Set working directory
WORKDIR /opt/flink
@ -15,17 +15,18 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.0/flink-sql-jdbc-driver-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"]

View File

@ -36,6 +36,10 @@ spec:
type: integer
jarUri:
type: string
jarURIBasicAuthUsername:
type: string
jarURIBasicAuthPassword:
type: string
args:
type: array
items:

View File

@ -3,13 +3,14 @@ apiVersion: flink.logicamp.tech/v1alpha1
kind: FlinkJob
metadata:
name: my-flink-job
namespace: default
spec:
key: word-count
name: "Word Count Example"
entryClass: "org.apache.flink.examples.java.wordcount.WordCount"
parallelism: 2
jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar"
entryClass: "tech.logicamp.logiline.FacilityEnrichment"
parallelism: 1
jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar"
jarURIBasicAuthUsername: logiline-actrunner
jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
parallelism.default: "2"
taskmanager.numberOfTaskSlots: "1"
parallelism.default: "1"

6
helm/chart/Chart.lock Normal file
View File

@ -0,0 +1,6 @@
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3
generated: "2025-04-04T14:42:09.771390014+03:30"

View File

@ -2,5 +2,9 @@ apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 0.1.14
appVersion: "0.1.0"
version: 1.0.0
appVersion: "0.1.1"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2

Binary file not shown.

View File

@ -17,6 +17,6 @@
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
echo "Visit http://127.0.0.1:8081 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT
{{- end }}

View File

@ -0,0 +1,31 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
state.checkpoints.dir: s3://flink/checkpoints/
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
state.savepoints.dir: s3://flink/savepoints/
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- end }}

View File

@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.data.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml

View File

@ -1,165 +0,0 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: localhost
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }}
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
{{- end }}
{{- define "flink.volumeMounts" -}}
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/data
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/rocksdb
subPath: rocksdb
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/checkpoints
subPath: checkpoints
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/web-upload
subPath: web-upload
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
- name: flink-savepoints
mountPath: {{ .Values.flink.state.savepoints.dir }}
{{- end }}
{{- define "flink.volumes" -}}
- name: flink-data
persistentVolumeClaim:
claimName: {{ .Values.flink.state.data.pvcName }}
- name: flink-savepoints
persistentVolumeClaim:
claimName: {{ .Values.flink.state.savepoints.pvcName }}
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Values.flink.state.ha.pvcName }}
{{- end }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
template:
metadata:
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"]
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: taskmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
ports:
- containerPort: 6121 # TaskManager data port
name: data
- containerPort: 6122 # TaskManager RPC port
name: rpc
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: localhost:8081
- name: SAVEPOINT_PATH
value: file://{{ .Values.flink.state.savepoints.dir }}
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
volumes:
{{- include "flink.volumes" . | nindent 8 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.ha.pvcName }}
name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
spec:
accessModes:
- ReadWriteOnce

View File

@ -0,0 +1,84 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"]
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -0,0 +1,28 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- name: flink-web-ui
port: 8081
targetPort: 8081
- name: rpc
port: 6123
targetPort: 6123
- name: blob
port: 6124
targetPort: 6124
- name: query
port: 6125
targetPort: 6125
- name: operator
port: 3000
targetPort: 3000
selector:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.savepoints.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml

View File

@ -1,19 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- port: 8081
name: flink-web-ui
targetPort: 8081
- port: 3000
name: operator
targetPort: 3000
selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@ -0,0 +1,58 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-task-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: taskmanager
spec:
serviceName: {{ .Release.Name }}-flink-task-manager
replicas: {{ .Values.flink.taskManager.replicas }}
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
containers:
- name: task-manager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
env:
{{- include "flink.env" . | nindent 8 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage
spec:
accessModes: [ ReadWriteOnce ]
resources:
requests:
storage: {{ .Values.flink.taskManager.storage.rocksDb.size }}

View File

@ -1,9 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "flink-kube-operator.fullname" . }}
name: {{ .Release.Name }}-flink-operator
labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }}
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
type: {{ .Values.service.type }}
ports:
@ -12,4 +13,5 @@ spec:
protocol: TCP
name: http
selector:
{{- include "flink-kube-operator.selectorLabels" . | nindent 4 }}
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator

View File

@ -0,0 +1,66 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-operator
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceName: {{ .Release.Name }}-flink-operator
replicas: 1
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: wait-for-jobmanager
image: curlimages/curl:8.5.0 # Lightweight curl image
command:
- sh
- -c
- |
echo "Waiting for Flink JobManager to be ready..."
until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do
echo "JobManager not ready yet - retrying in 5s..."
sleep 5
done
echo "JobManager is ready!"
containers:
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: {{ .Release.Name }}-flink-job-manager:8081
- name: SAVEPOINT_PATH
value: s3://flink/savepoints/
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
- name: S3_ENDPOINT
value: "http://{{ .Release.Name }}-minio:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-user
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-password

View File

@ -117,7 +117,7 @@ affinity: {}
flink:
image:
repository: lcr.logicamp.tech/library/flink
tag: 1.20.0-scala_2.12-java17-minicluster
tag: 1.20.1-scala_2.12-java17-minicluster
parallelism:
default: 1 # Default parallelism for Flink jobs
@ -129,14 +129,6 @@ flink:
state:
backend: rocksdb # Use RocksDB for state backend
incremental: true
savepoints:
dir: "/opt/flink/savepoints" # Directory to store savepoints
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
size: 10Gi # PVC size for savepoints storage
data:
dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb
pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb
size: 10Gi # PVC size for checkpoints/web-upload/rocksdb
ha:
dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha
@ -149,5 +141,14 @@ flink:
taskManager:
numberOfTaskSlots: 12 # Number of task slots for TaskManager
processMemory: 4096m # Size of task manager process memory
# clusterId: some-id
replicas: 1
storage:
rocksDb:
size: 4Gi
resources:
limits:
cpu: 3
memory: 4Gi
requests:
cpu: 1
memory: 2Gi

Binary file not shown.

View File

@ -1,6 +1,20 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.0.tgz
version: 1.0.0
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.971040367+03:30"
@ -53,7 +67,7 @@ entries:
version: 0.1.10
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.975218534+03:30"
created: "2025-03-04T18:04:35.495842696+03:30"
description: Helm chart for flink kube operator
digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c
name: flink-kube-operator
@ -63,7 +77,7 @@ entries:
version: 0.1.9
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.974750898+03:30"
created: "2025-03-04T18:04:35.495392608+03:30"
description: Helm chart for flink kube operator
digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675
name: flink-kube-operator
@ -73,7 +87,7 @@ entries:
version: 0.1.8
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.974306458+03:30"
created: "2025-03-04T18:04:35.494948853+03:30"
description: Helm chart for flink kube operator
digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235
name: flink-kube-operator
@ -83,7 +97,7 @@ entries:
version: 0.1.7
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.973833587+03:30"
created: "2025-03-04T18:04:35.49450822+03:30"
description: Helm chart for flink kube operator
digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e
name: flink-kube-operator
@ -93,7 +107,7 @@ entries:
version: 0.1.6
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.972800097+03:30"
created: "2025-03-04T18:04:35.494040193+03:30"
description: Helm chart for flink kube operator
digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6
name: flink-kube-operator
@ -103,7 +117,7 @@ entries:
version: 0.1.5
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.972374168+03:30"
created: "2025-03-04T18:04:35.493584927+03:30"
description: Helm chart for flink kube operator
digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a
name: flink-kube-operator
@ -113,7 +127,7 @@ entries:
version: 0.1.4
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.971952322+03:30"
created: "2025-03-04T18:04:35.493138547+03:30"
description: Helm chart for flink kube operator
digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f
name: flink-kube-operator
@ -123,7 +137,7 @@ entries:
version: 0.1.3
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.971461428+03:30"
created: "2025-03-04T18:04:35.492696005+03:30"
description: Helm chart for flink kube operator
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
name: flink-kube-operator
@ -133,7 +147,7 @@ entries:
version: 0.1.2
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.968770748+03:30"
created: "2025-03-04T18:04:35.490170385+03:30"
description: Helm chart for flink kube operator
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
name: flink-kube-operator
@ -143,7 +157,7 @@ entries:
version: 0.1.1
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.968266924+03:30"
created: "2025-03-04T18:04:35.489734651+03:30"
description: Helm chart for flink kube operator
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
name: flink-kube-operator
@ -151,4 +165,4 @@ entries:
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2025-04-04T13:50:27.967565847+03:30"
generated: "2025-04-06T01:52:09.466886557+03:30"

View File

@ -14,6 +14,8 @@ type FlinkJobSpec struct {
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`

View File

@ -2,10 +2,12 @@ package jar
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"net/http"
"net/http/cookiejar"
"os"
"strings"
@ -18,11 +20,15 @@ import (
type JarFile struct {
uri string
filePath string
basicAuthUsername *string
basicAuthPassword *string
}
func NewJarFile(URI string) (*JarFile, error) {
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
jarFile := &JarFile{
uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
}
err := jarFile.Download()
if err != nil {
@ -57,11 +63,47 @@ func (jarFile *JarFile) Download() error {
}
defer out.Close()
resp, err := http.Get(jarFile.uri)
if err != nil || resp.StatusCode > 299 {
var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
defer resp.Body.Close()
_, err = io.Copy(out, resp.Body)

View File

@ -9,7 +9,7 @@ import (
// upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err