From 14aba80181b0481e9d6d27dddb30b85df0046763 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Fri, 10 Jan 2025 12:39:36 +0330 Subject: [PATCH] feat: separate job-manager and task-manager containers --- helm/chart/templates/flink/deploy.yaml | 161 +++++++++++++------------ 1 file changed, 84 insertions(+), 77 deletions(-) diff --git a/helm/chart/templates/flink/deploy.yaml b/helm/chart/templates/flink/deploy.yaml index ac279da..1448c1f 100644 --- a/helm/chart/templates/flink/deploy.yaml +++ b/helm/chart/templates/flink/deploy.yaml @@ -1,3 +1,61 @@ +{{- define "flink.env" -}} +- name: JOB_MANAGER_RPC_ADDRESS + value: "localhost" +- name: NAMESPACE + value: {{ .Release.Namespace }} +- name: FLINK_PROPERTIES + value: | + jobmanager.rpc.address: localhost + jobmanager.memory.process.size: 2048m + taskmanager.memory.process.size: 2048m + taskmanager.data.port: 6125 + taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }} + parallelism.default: {{ .Values.flink.parallelism.default }} + state.backend: {{ .Values.flink.state.backend }} + rest.port: 8081 + rootLogger.level = DEBUG + rootLogger.appenderRef.console.ref = ConsoleAppender + high-availability.type: kubernetes + kubernetes.namespace: {{ .Release.Namespace }} + kubernetes.cluster-id: cluster-one + web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload + state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints + state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb + high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }} + state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }} + state.backend.incremental: false +{{- end }} + +{{- define "flink.volumeMounts" -}} +- name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/data +- name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/rocksdb + subPath: rocksdb +- name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/checkpoints + subPath: checkpoints +- name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/web-upload + subPath: web-upload +- name: flink-ha + mountPath: {{ .Values.flink.state.ha.dir }} +- name: flink-savepoints + mountPath: {{ .Values.flink.state.savepoints.dir }} +{{- end }} + +{{- define "flink.volumes" -}} +- name: flink-data + persistentVolumeClaim: + claimName: {{ .Values.flink.state.data.pvcName }} +- name: flink-savepoints + persistentVolumeClaim: + claimName: {{ .Values.flink.state.savepoints.pvcName }} +- name: flink-ha + persistentVolumeClaim: + claimName: {{ .Values.flink.state.ha.pvcName }} +{{- end }} + apiVersion: apps/v1 kind: Deployment metadata: @@ -25,91 +83,40 @@ spec: runAsUser: 0 command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"] volumeMounts: - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/data - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/rocksdb - subPath: rocksdb - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/checkpoints - subPath: checkpoints - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/web-upload - subPath: web-upload - - name: flink-ha - mountPath: {{ .Values.flink.state.ha.dir }} - - name: flink-savepoints - mountPath: {{ .Values.flink.state.savepoints.dir }} + {{- include "flink.volumeMounts" . | nindent 12 }} serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} containers: - - name: flink + - name: jobmanager image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} imagePullPolicy: Always - livenessProbe: - httpGet: - path: / - port: 8081 - failureThreshold: 8 - readinessProbe: - httpGet: - path: / - port: 8081 - failureThreshold: 8 + args: ["jobmanager"] ports: + - containerPort: 6123 # JobManager RPC port + name: rpc + - containerPort: 6124 # JobManager blob server port + name: blob + - containerPort: 6125 # JobManager queryable state port + name: query - containerPort: 8081 # JobManager Web UI port - - containerPort: 6121 # TaskManager communication port - - containerPort: 6122 # TaskManager communication port + name: ui env: - - name: JOB_MANAGER_RPC_ADDRESS - value: "localhost" # JobManager and TaskManager in the same container - - name: NAMESPACE - value: {{ .Release.Namespace }} - - name: FLINK_PROPERTIES - value: | - jobmanager.rpc.address: localhost - jobmanager.memory.process.size: 2048m - taskmanager.memory.process.size: 2048m - taskmanager.data.port: 6125 - taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }} - parallelism.default: {{ .Values.flink.parallelism.default }} - state.backend: {{ .Values.flink.state.backend }} - rest.port: 8081 - rootLogger.level = DEBUG - rootLogger.appenderRef.console.ref = ConsoleAppender - high-availability.type: kubernetes - kubernetes.namespace: {{ .Release.Namespace }} - kubernetes.cluster-id: cluster-one - web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload - state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints - state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb - high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }} - state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }} - state.backend.incremental: false + {{- include "flink.env" . | nindent 12 }} volumeMounts: - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/data - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/rocksdb - subPath: rocksdb - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/checkpoints - subPath: checkpoints - - name: flink-data - mountPath: {{ .Values.flink.state.data.dir }}/web-upload - subPath: web-upload - - name: flink-ha - mountPath: {{ .Values.flink.state.ha.dir }} - - name: flink-savepoints - mountPath: {{ .Values.flink.state.savepoints.dir }} + {{- include "flink.volumeMounts" . | nindent 12 }} + - name: taskmanager + image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} + imagePullPolicy: Always + args: ["taskmanager"] + ports: + - containerPort: 6121 # TaskManager data port + name: data + - containerPort: 6122 # TaskManager RPC port + name: rpc + env: + {{- include "flink.env" . | nindent 12 }} + volumeMounts: + {{- include "flink.volumeMounts" . | nindent 12 }} volumes: - - name: flink-data - persistentVolumeClaim: - claimName: {{ .Values.flink.state.data.pvcName }} # PVC for savepoints persistence - - name: flink-savepoints - persistentVolumeClaim: - claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence - - name: flink-ha - persistentVolumeClaim: - claimName: {{ .Values.flink.state.ha.pvcName }} # PVC for savepoints persistence \ No newline at end of file + {{- include "flink.volumes" . | nindent 8 }} \ No newline at end of file