diff --git a/.vscode/settings.json b/.vscode/settings.json index 37077b5..54a8d12 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -14,6 +14,7 @@ "nindent", "reactivex", "repsert", + "taskmanager", "tolerations" ] } \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index beb8f96..3ad486a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build ARG upx_version=4.2.4 -RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \ +RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \ curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \ cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \ chmod +x /usr/local/bin/upx && \ @@ -27,6 +27,7 @@ FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final COPY --from=build /flink-kube-operator /flink-kube-operator +COPY --from=build /etc/ssl/certs /etc/ssl/certs EXPOSE 8083 diff --git a/Dockerfile.flink b/Dockerfile.flink index 584fb83..8a10ef3 100644 --- a/Dockerfile.flink +++ b/Dockerfile.flink @@ -1,4 +1,4 @@ -FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17 +FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17 # Set working directory WORKDIR /opt/flink @@ -15,17 +15,18 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/ -RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/ -RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/ +RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/ +RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/ RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/ -RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.0/flink-sql-jdbc-driver-1.20.0.jar -P /opt/flink/lib/ +RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/ +RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/ # Command to start Flink JobManager and TaskManager in a mini-cluster setup CMD ["bin/start-cluster.sh"] \ No newline at end of file diff --git a/crds.yaml b/crds.yaml index c044377..5de3b21 100644 --- a/crds.yaml +++ b/crds.yaml @@ -36,6 +36,10 @@ spec: type: integer jarUri: type: string + jarURIBasicAuthUsername: + type: string + jarURIBasicAuthPassword: + type: string args: type: array items: diff --git a/example-job.yaml b/example-job.yaml index ebd74a2..a1b3068 100644 --- a/example-job.yaml +++ b/example-job.yaml @@ -3,13 +3,14 @@ apiVersion: flink.logicamp.tech/v1alpha1 kind: FlinkJob metadata: name: my-flink-job - namespace: default spec: key: word-count name: "Word Count Example" - entryClass: "org.apache.flink.examples.java.wordcount.WordCount" - parallelism: 2 - jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar" + entryClass: "tech.logicamp.logiline.FacilityEnrichment" + parallelism: 1 + jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar" + jarURIBasicAuthUsername: logiline-actrunner + jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong flinkConfiguration: - taskmanager.numberOfTaskSlots: "2" - parallelism.default: "2" \ No newline at end of file + taskmanager.numberOfTaskSlots: "1" + parallelism.default: "1" \ No newline at end of file diff --git a/helm/chart/Chart.lock b/helm/chart/Chart.lock new file mode 100644 index 0000000..f6368ff --- /dev/null +++ b/helm/chart/Chart.lock @@ -0,0 +1,6 @@ +dependencies: +- name: minio + repository: https://charts.bitnami.com/bitnami + version: 16.0.2 +digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3 +generated: "2025-04-04T14:42:09.771390014+03:30" diff --git a/helm/chart/Chart.yaml b/helm/chart/Chart.yaml index 9fc1a17..7bf5006 100644 --- a/helm/chart/Chart.yaml +++ b/helm/chart/Chart.yaml @@ -4,3 +4,7 @@ description: Helm chart for flink kube operator type: application version: 0.1.14 appVersion: "0.1.0" +dependencies: + - name: minio + repository: https://charts.bitnami.com/bitnami + version: 16.0.2 \ No newline at end of file diff --git a/helm/chart/charts/minio-16.0.2.tgz b/helm/chart/charts/minio-16.0.2.tgz new file mode 100644 index 0000000..624d25b Binary files /dev/null and b/helm/chart/charts/minio-16.0.2.tgz differ diff --git a/helm/chart/templates/NOTES.txt b/helm/chart/templates/NOTES.txt index 8236e73..6d4a21d 100644 --- a/helm/chart/templates/NOTES.txt +++ b/helm/chart/templates/NOTES.txt @@ -17,6 +17,6 @@ {{- else if contains "ClusterIP" .Values.service.type }} export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") - echo "Visit http://127.0.0.1:8080 to use your application" - kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT + echo "Visit http://127.0.0.1:8081 to use your application" + kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT {{- end }} diff --git a/helm/chart/templates/flink/config.yaml b/helm/chart/templates/flink/config.yaml new file mode 100644 index 0000000..d128807 --- /dev/null +++ b/helm/chart/templates/flink/config.yaml @@ -0,0 +1,31 @@ +{{- define "flink.env" -}} +- name: JOB_MANAGER_RPC_ADDRESS + value: "localhost" +- name: NAMESPACE + value: {{ .Release.Namespace }} +- name: FLINK_PROPERTIES + value: | + jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager + jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }} + taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }} + taskmanager.data.port: 6125 + taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }} + parallelism.default: {{ .Values.flink.parallelism.default }} + state.backend: {{ .Values.flink.state.backend }} + rest.port: 8081 + rootLogger.level = DEBUG + rootLogger.appenderRef.console.ref = ConsoleAppender + high-availability.type: kubernetes + kubernetes.namespace: {{ .Release.Namespace }} + kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }} + execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }} + execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }} + state.checkpoints.dir: s3://flink/checkpoints/ + state.backend.rocksdb.localdir: /opt/flink/rocksdb + high-availability.storageDir: /opt/flink/ha + state.savepoints.dir: s3://flink/savepoints/ + state.backend.incremental: {{ .Values.flink.state.incremental }} + rest.profiling.enabled: true + s3.endpoint: http://{{ .Release.Name }}-minio:9000 + s3.path.style.access: true +{{- end }} diff --git a/helm/chart/templates/flink/data.pvc.yaml b/helm/chart/templates/flink/data.pvc.yaml deleted file mode 100644 index e09bdea..0000000 --- a/helm/chart/templates/flink/data.pvc.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: {{ .Values.flink.state.data.pvcName }} -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml diff --git a/helm/chart/templates/flink/deploy.yaml b/helm/chart/templates/flink/deploy.yaml deleted file mode 100644 index 5a7c74e..0000000 --- a/helm/chart/templates/flink/deploy.yaml +++ /dev/null @@ -1,165 +0,0 @@ -{{- define "flink.env" -}} -- name: JOB_MANAGER_RPC_ADDRESS - value: "localhost" -- name: NAMESPACE - value: {{ .Release.Namespace }} -- name: FLINK_PROPERTIES - value: | - jobmanager.rpc.address: localhost - jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }} - taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }} - taskmanager.data.port: 6125 - taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }} - parallelism.default: {{ .Values.flink.parallelism.default }} - state.backend: {{ .Values.flink.state.backend }} - rest.port: 8081 - rootLogger.level = DEBUG - rootLogger.appenderRef.console.ref = ConsoleAppender - high-availability.type: kubernetes - kubernetes.namespace: {{ .Release.Namespace }} - kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }} - execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }} - execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }} - web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload - state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints - state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb - high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }} - state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }} - state.backend.incremental: {{ .Values.flink.state.incremental }} - rest.profiling.enabled: true -{{- end }} - -{{- define "flink.volumeMounts" -}} -- name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/data -- name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/rocksdb - subPath: rocksdb -- name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/checkpoints - subPath: checkpoints -- name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/web-upload - subPath: web-upload -- name: flink-ha - mountPath: {{ .Values.flink.state.ha.dir }} -- name: flink-savepoints - mountPath: {{ .Values.flink.state.savepoints.dir }} -{{- end }} - -{{- define "flink.volumes" -}} -- name: flink-data - persistentVolumeClaim: - claimName: {{ .Values.flink.state.data.pvcName }} -- name: flink-savepoints - persistentVolumeClaim: - claimName: {{ .Values.flink.state.savepoints.pvcName }} -- name: flink-ha - persistentVolumeClaim: - claimName: {{ .Values.flink.state.ha.pvcName }} -{{- end }} - -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ .Release.Name }}-flink - labels: - app.kubernetes.io/name: {{ .Release.Name }}-flink - app.kubernetes.io/instance: {{ .Release.Name }} -spec: - replicas: 1 - strategy: - type: Recreate - selector: - matchLabels: - app.kubernetes.io/name: {{ .Release.Name }}-flink - app.kubernetes.io/instance: {{ .Release.Name }} - template: - metadata: - labels: - app.kubernetes.io/name: {{ .Release.Name }}-flink - app.kubernetes.io/instance: {{ .Release.Name }} - spec: - serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} - initContainers: - - name: volume-mount-hack - image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} - runAsUser: 0 - command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"] - volumeMounts: - {{- include "flink.volumeMounts" . | nindent 12 }} - containers: - - name: jobmanager - image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} - imagePullPolicy: Always - args: ["jobmanager"] - ports: - - containerPort: 6123 # JobManager RPC port - name: rpc - - containerPort: 6124 # JobManager blob server port - name: blob - - containerPort: 6125 # JobManager queryable state port - name: query - - containerPort: 8081 # JobManager Web UI port - name: ui - env: - {{- include "flink.env" . | nindent 12 }} - - name: POD_IP - valueFrom: - fieldRef: - fieldPath: status.podIP - volumeMounts: - {{- include "flink.volumeMounts" . | nindent 12 }} - - name: taskmanager - image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} - imagePullPolicy: Always - args: ["taskmanager"] - ports: - - containerPort: 6121 # TaskManager data port - name: data - - containerPort: 6122 # TaskManager RPC port - name: rpc - env: - {{- include "flink.env" . | nindent 12 }} - - name: POD_IP - valueFrom: - fieldRef: - fieldPath: status.podIP - volumeMounts: - {{- include "flink.volumeMounts" . | nindent 12 }} - - name: operator - securityContext: - {{- toYaml .Values.securityContext | nindent 12 }} - image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - ports: - - name: http - containerPort: {{ .Values.service.port }} - protocol: TCP - env: - - name: FLINK_API_URL - value: localhost:8081 - - name: SAVEPOINT_PATH - value: file://{{ .Values.flink.state.savepoints.dir }} - - name: NAMESPACE - value: "{{ .Release.Namespace }}" - - resources: - {{- toYaml .Values.resources | nindent 12 }} - volumeMounts: - {{- include "flink.volumeMounts" . | nindent 12 }} - volumes: - {{- include "flink.volumes" . | nindent 8 }} - - {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} diff --git a/helm/chart/templates/flink/ha.pvc.yaml b/helm/chart/templates/flink/ha.pvc.yaml index fcff631..ee3b672 100644 --- a/helm/chart/templates/flink/ha.pvc.yaml +++ b/helm/chart/templates/flink/ha.pvc.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: PersistentVolumeClaim metadata: - name: {{ .Values.flink.state.ha.pvcName }} + name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }} spec: accessModes: - ReadWriteOnce diff --git a/helm/chart/templates/flink/job-manager-deploy.yaml b/helm/chart/templates/flink/job-manager-deploy.yaml new file mode 100644 index 0000000..017bef4 --- /dev/null +++ b/helm/chart/templates/flink/job-manager-deploy.yaml @@ -0,0 +1,84 @@ + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Release.Name }}-flink-job-manager + labels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-job-manager +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-job-manager + template: + metadata: + labels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-job-manager + spec: + serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} + initContainers: + - name: volume-mount-hack + image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} + runAsUser: 0 + command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"] + volumeMounts: + - name: flink-ha + mountPath: {{ .Values.flink.state.ha.dir }} + containers: + - name: jobmanager + image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} + imagePullPolicy: Always + args: ["jobmanager"] + ports: + - containerPort: 6123 # JobManager RPC port + name: rpc + - containerPort: 6124 # JobManager blob server port + name: blob + - containerPort: 6125 # JobManager queryable state port + name: query + - containerPort: 8081 # JobManager Web UI port + name: ui + env: + {{- include "flink.env" . | nindent 12 }} + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: S3_ENDPOINT + value: "http://minio-service:9000" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-flink-secrets + key: minio_access_key + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-flink-secrets + key: minio_secret_key + volumeMounts: + - name: flink-ha + mountPath: {{ .Values.flink.state.ha.dir }} + + volumes: + - name: flink-ha + persistentVolumeClaim: + claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }} + + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/helm/chart/templates/flink/job-manager-service.yaml b/helm/chart/templates/flink/job-manager-service.yaml new file mode 100644 index 0000000..c7d3af3 --- /dev/null +++ b/helm/chart/templates/flink/job-manager-service.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Release.Name }}-flink-job-manager + labels: + app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager + app.kubernetes.io/instance: {{ .Release.Name }} +spec: + ports: + - name: flink-web-ui + port: 8081 + targetPort: 8081 + - name: rpc + port: 6123 + targetPort: 6123 + - name: blob + port: 6124 + targetPort: 6124 + - name: query + port: 6125 + targetPort: 6125 + - name: operator + port: 3000 + targetPort: 3000 + selector: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-job-manager + type: ClusterIP # Change to LoadBalancer if you want external access diff --git a/helm/chart/templates/flink/savepoint.pvc.yaml b/helm/chart/templates/flink/savepoint.pvc.yaml deleted file mode 100644 index 6906af1..0000000 --- a/helm/chart/templates/flink/savepoint.pvc.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: {{ .Values.flink.state.savepoints.pvcName }} -spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml diff --git a/helm/chart/templates/flink/service.yaml b/helm/chart/templates/flink/service.yaml deleted file mode 100644 index 5a7cc1b..0000000 --- a/helm/chart/templates/flink/service.yaml +++ /dev/null @@ -1,19 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: flink - labels: - app.kubernetes.io/name: {{ .Release.Name }}-flink - app.kubernetes.io/instance: {{ .Release.Name }} -spec: - ports: - - port: 8081 - name: flink-web-ui - targetPort: 8081 - - port: 3000 - name: operator - targetPort: 3000 - selector: - app.kubernetes.io/name: {{ .Release.Name }}-flink - app.kubernetes.io/instance: {{ .Release.Name }} - type: ClusterIP # Change to LoadBalancer if you want external access diff --git a/helm/chart/templates/flink/task-manager-statefulset.yaml b/helm/chart/templates/flink/task-manager-statefulset.yaml new file mode 100644 index 0000000..cd260a9 --- /dev/null +++ b/helm/chart/templates/flink/task-manager-statefulset.yaml @@ -0,0 +1,58 @@ + +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ .Release.Name }}-flink-task-manager + labels: + app: {{ .Release.Name }}-flink-operator + component: taskmanager +spec: + serviceName: {{ .Release.Name }}-flink-task-manager + replicas: {{ .Values.flink.taskManager.replicas }} + selector: + matchLabels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-task-manager + template: + metadata: + labels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-task-manager + spec: + serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} + containers: + - name: task-manager + image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} + imagePullPolicy: Always + args: ["taskmanager"] + env: + {{- include "flink.env" . | nindent 8 }} + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: S3_ENDPOINT + value: "http://minio-service:9000" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-flink-secrets + key: minio_access_key + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-flink-secrets + key: minio_secret_key + volumeMounts: + - name: rocksdb-storage + mountPath: /opt/flink/rocksdb + resources: + {{- toYaml .Values.flink.taskManager.resources | nindent 10 }} + volumeClaimTemplates: + - metadata: + name: rocksdb-storage + spec: + accessModes: [ ReadWriteOnce ] + resources: + requests: + storage: {{ .Values.flink.taskManager.storage.rocksDb.size }} diff --git a/helm/chart/templates/operator/deployment.yaml b/helm/chart/templates/operator/deployment.yaml deleted file mode 100644 index e69de29..0000000 diff --git a/helm/chart/templates/operator/service.yaml b/helm/chart/templates/operator/service.yaml index 887cfb0..cef229a 100644 --- a/helm/chart/templates/operator/service.yaml +++ b/helm/chart/templates/operator/service.yaml @@ -1,9 +1,10 @@ apiVersion: v1 kind: Service metadata: - name: {{ include "flink-kube-operator.fullname" . }} + name: {{ .Release.Name }}-flink-operator labels: - {{- include "flink-kube-operator.labels" . | nindent 4 }} + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-operator spec: type: {{ .Values.service.type }} ports: @@ -12,4 +13,5 @@ spec: protocol: TCP name: http selector: - {{- include "flink-kube-operator.selectorLabels" . | nindent 4 }} + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-operator diff --git a/helm/chart/templates/operator/statefulset.yaml b/helm/chart/templates/operator/statefulset.yaml new file mode 100644 index 0000000..c50f097 --- /dev/null +++ b/helm/chart/templates/operator/statefulset.yaml @@ -0,0 +1,66 @@ + +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ .Release.Name }}-flink-operator + labels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-operator +spec: + serviceName: {{ .Release.Name }}-flink-operator + replicas: 1 + selector: + matchLabels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-operator + template: + metadata: + labels: + app: {{ .Release.Name }}-flink-operator + component: {{ .Release.Name }}-flink-operator + spec: + serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} + initContainers: + - name: wait-for-jobmanager + image: curlimages/curl:8.5.0 # Lightweight curl image + command: + - sh + - -c + - | + echo "Waiting for Flink JobManager to be ready..." + until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do + echo "JobManager not ready yet - retrying in 5s..." + sleep 5 + done + echo "JobManager is ready!" + containers: + - name: operator + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + ports: + - name: http + containerPort: {{ .Values.service.port }} + protocol: TCP + env: + - name: FLINK_API_URL + value: {{ .Release.Name }}-flink-job-manager:8081 + - name: SAVEPOINT_PATH + value: s3://flink/savepoints/ + - name: NAMESPACE + value: "{{ .Release.Namespace }}" + - name: S3_ENDPOINT + value: "http://{{ .Release.Name }}-minio:9000" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-minio + key: root-user + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-minio + key: root-password + + diff --git a/helm/chart/values.yaml b/helm/chart/values.yaml index a9ec791..0bf2e4a 100644 --- a/helm/chart/values.yaml +++ b/helm/chart/values.yaml @@ -117,7 +117,7 @@ affinity: {} flink: image: repository: lcr.logicamp.tech/library/flink - tag: 1.20.0-scala_2.12-java17-minicluster + tag: 1.20.1-scala_2.12-java17-minicluster parallelism: default: 1 # Default parallelism for Flink jobs @@ -129,14 +129,6 @@ flink: state: backend: rocksdb # Use RocksDB for state backend incremental: true - savepoints: - dir: "/opt/flink/savepoints" # Directory to store savepoints - pvcName: flink-savepoints-pvc # PVC for savepoints persistence - size: 10Gi # PVC size for savepoints storage - data: - dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb - pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb - size: 10Gi # PVC size for checkpoints/web-upload/rocksdb ha: dir: "/opt/flink/ha" # Directory to store ha data pvcName: flink-ha-pvc # PVC for ha @@ -149,5 +141,14 @@ flink: taskManager: numberOfTaskSlots: 12 # Number of task slots for TaskManager processMemory: 4096m # Size of task manager process memory - -# clusterId: some-id \ No newline at end of file + replicas: 1 + storage: + rocksDb: + size: 4Gi + resources: + limits: + cpu: 3 + memory: 4Gi + requests: + cpu: 1 + memory: 2Gi \ No newline at end of file diff --git a/helm/index.yaml b/helm/index.yaml index 678c5d9..f07333c 100644 --- a/helm/index.yaml +++ b/helm/index.yaml @@ -53,7 +53,7 @@ entries: version: 0.1.10 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.975218534+03:30" + created: "2025-03-04T18:04:35.495842696+03:30" description: Helm chart for flink kube operator digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c name: flink-kube-operator @@ -63,7 +63,7 @@ entries: version: 0.1.9 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.974750898+03:30" + created: "2025-03-04T18:04:35.495392608+03:30" description: Helm chart for flink kube operator digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675 name: flink-kube-operator @@ -73,7 +73,7 @@ entries: version: 0.1.8 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.974306458+03:30" + created: "2025-03-04T18:04:35.494948853+03:30" description: Helm chart for flink kube operator digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235 name: flink-kube-operator @@ -83,7 +83,7 @@ entries: version: 0.1.7 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.973833587+03:30" + created: "2025-03-04T18:04:35.49450822+03:30" description: Helm chart for flink kube operator digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e name: flink-kube-operator @@ -93,7 +93,7 @@ entries: version: 0.1.6 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.972800097+03:30" + created: "2025-03-04T18:04:35.494040193+03:30" description: Helm chart for flink kube operator digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6 name: flink-kube-operator @@ -103,7 +103,7 @@ entries: version: 0.1.5 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.972374168+03:30" + created: "2025-03-04T18:04:35.493584927+03:30" description: Helm chart for flink kube operator digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a name: flink-kube-operator @@ -113,7 +113,7 @@ entries: version: 0.1.4 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.971952322+03:30" + created: "2025-03-04T18:04:35.493138547+03:30" description: Helm chart for flink kube operator digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f name: flink-kube-operator @@ -123,7 +123,7 @@ entries: version: 0.1.3 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.971461428+03:30" + created: "2025-03-04T18:04:35.492696005+03:30" description: Helm chart for flink kube operator digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2 name: flink-kube-operator @@ -133,7 +133,7 @@ entries: version: 0.1.2 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.968770748+03:30" + created: "2025-03-04T18:04:35.490170385+03:30" description: Helm chart for flink kube operator digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2 name: flink-kube-operator @@ -143,7 +143,7 @@ entries: version: 0.1.1 - apiVersion: v2 appVersion: 0.1.0 - created: "2025-04-04T13:50:27.968266924+03:30" + created: "2025-03-04T18:04:35.489734651+03:30" description: Helm chart for flink kube operator digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da name: flink-kube-operator diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 930da8c..e0dfe8a 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -10,13 +10,15 @@ import ( //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE type FlinkJobSpec struct { - Key string `json:"key"` - Name string `json:"name"` - Parallelism int `json:"parallelism"` - JarURI string `json:"jarUri"` - SavepointInterval metaV1.Duration `json:"savepointInterval"` - EntryClass string `json:"entryClass"` - Args []string `json:"args"` + Key string `json:"key"` + Name string `json:"name"` + Parallelism int `json:"parallelism"` + JarURI string `json:"jarUri"` + JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"` + JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"` + SavepointInterval metaV1.Duration `json:"savepointInterval"` + EntryClass string `json:"entryClass"` + Args []string `json:"args"` } type FlinkJobStatus struct { diff --git a/internal/jar/jar.go b/internal/jar/jar.go index 2fafc9b..78de3d4 100644 --- a/internal/jar/jar.go +++ b/internal/jar/jar.go @@ -2,10 +2,12 @@ package jar import ( "crypto/rand" + "encoding/base64" "encoding/hex" "errors" "io" "net/http" + "net/http/cookiejar" "os" "strings" @@ -16,13 +18,17 @@ import ( ) type JarFile struct { - uri string - filePath string + uri string + filePath string + basicAuthUsername *string + basicAuthPassword *string } -func NewJarFile(URI string) (*JarFile, error) { +func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) { jarFile := &JarFile{ - uri: URI, + uri: URI, + basicAuthUsername: basicAuthUsername, + basicAuthPassword: basicAuthPassword, } err := jarFile.Download() if err != nil { @@ -57,9 +63,45 @@ func (jarFile *JarFile) Download() error { } defer out.Close() - resp, err := http.Get(jarFile.uri) - if err != nil || resp.StatusCode > 299 { + + var resp *http.Response + if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil { + + basicAuth := func(username, password string) string { + auth := username + ":" + password + return base64.StdEncoding.EncodeToString([]byte(auth)) + } + + redirectPolicyFunc := func(req *http.Request, via []*http.Request) error { + req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword)) + return nil + } + + client := &http.Client{ + Jar: &cookiejar.Jar{}, + CheckRedirect: redirectPolicyFunc, + } + + req, err := http.NewRequest("GET", jarFile.uri, nil) + if err != nil { + jarFile.delete() + return err + } + req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword)) + resp, err = client.Do(req) + } else { + resp, err = http.Get(jarFile.uri) + } + if err != nil { jarFile.delete() + pkg.Logger.Error("error in downloading jar", zap.Error(err)) + return err + } + if resp.StatusCode > 299 { + respBody := []byte{} + resp.Body.Read(respBody) + err = errors.New(string(respBody) + " status:" + resp.Status) + pkg.Logger.Error("error in downloading jar", zap.Error(err)) return err } diff --git a/internal/managed_job/jar.go b/internal/managed_job/jar.go index 515081e..0f376b4 100644 --- a/internal/managed_job/jar.go +++ b/internal/managed_job/jar.go @@ -9,7 +9,7 @@ import ( // upload jar file and set the jarId for later usages func (job *ManagedJob) upload() error { - jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) + jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword) if err != nil { pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err)) return err