{{- define "flink.env" -}} - name: JOB_MANAGER_RPC_ADDRESS value: "localhost" - name: NAMESPACE value: {{ .Release.Namespace }} - name: FLINK_PROPERTIES value: | jobmanager.rpc.address: localhost jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }} taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }} taskmanager.data.port: 6125 taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }} parallelism.default: {{ .Values.flink.parallelism.default }} state.backend: {{ .Values.flink.state.backend }} rest.port: 8081 rootLogger.level = DEBUG rootLogger.appenderRef.console.ref = ConsoleAppender high-availability.type: kubernetes kubernetes.namespace: {{ .Release.Namespace }} kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }} execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }} execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }} web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }} state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }} state.backend.incremental: {{ .Values.flink.state.incremental }} rest.profiling.enabled: true {{- end }} {{- define "flink.volumeMounts" -}} - name: flink-data mountPath: {{ .Values.flink.state.data.dir }}/data - name: flink-data mountPath: {{ .Values.flink.state.data.dir }}/rocksdb subPath: rocksdb - name: flink-data mountPath: {{ .Values.flink.state.data.dir }}/checkpoints subPath: checkpoints - name: flink-data mountPath: {{ .Values.flink.state.data.dir }}/web-upload subPath: web-upload - name: flink-ha mountPath: {{ .Values.flink.state.ha.dir }} - name: flink-savepoints mountPath: {{ .Values.flink.state.savepoints.dir }} {{- end }} {{- define "flink.volumes" -}} - name: flink-data persistentVolumeClaim: claimName: {{ .Values.flink.state.data.pvcName }} - name: flink-savepoints persistentVolumeClaim: claimName: {{ .Values.flink.state.savepoints.pvcName }} - name: flink-ha persistentVolumeClaim: claimName: {{ .Values.flink.state.ha.pvcName }} {{- end }} apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Release.Name }}-flink labels: app.kubernetes.io/name: {{ .Release.Name }}-flink app.kubernetes.io/instance: {{ .Release.Name }} spec: replicas: 1 strategy: type: Recreate selector: matchLabels: app.kubernetes.io/name: {{ .Release.Name }}-flink app.kubernetes.io/instance: {{ .Release.Name }} template: metadata: labels: app.kubernetes.io/name: {{ .Release.Name }}-flink app.kubernetes.io/instance: {{ .Release.Name }} spec: serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} initContainers: - name: volume-mount-hack image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} runAsUser: 0 command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"] volumeMounts: {{- include "flink.volumeMounts" . | nindent 12 }} containers: - name: jobmanager image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} imagePullPolicy: Always args: ["jobmanager"] ports: - containerPort: 6123 # JobManager RPC port name: rpc - containerPort: 6124 # JobManager blob server port name: blob - containerPort: 6125 # JobManager queryable state port name: query - containerPort: 8081 # JobManager Web UI port name: ui env: {{- include "flink.env" . | nindent 12 }} - name: POD_IP valueFrom: fieldRef: fieldPath: status.podIP volumeMounts: {{- include "flink.volumeMounts" . | nindent 12 }} - name: taskmanager image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} imagePullPolicy: Always args: ["taskmanager"] ports: - containerPort: 6121 # TaskManager data port name: data - containerPort: 6122 # TaskManager RPC port name: rpc env: {{- include "flink.env" . | nindent 12 }} - name: POD_IP valueFrom: fieldRef: fieldPath: status.podIP volumeMounts: {{- include "flink.volumeMounts" . | nindent 12 }} - name: operator securityContext: {{- toYaml .Values.securityContext | nindent 12 }} image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - name: http containerPort: {{ .Values.service.port }} protocol: TCP env: - name: FLINK_API_URL value: localhost:8081 - name: SAVEPOINT_PATH value: file://{{ .Values.flink.state.savepoints.dir }} - name: NAMESPACE value: "{{ .Release.Namespace }}" resources: {{- toYaml .Values.resources | nindent 12 }} volumeMounts: {{- include "flink.volumeMounts" . | nindent 12 }} volumes: {{- include "flink.volumes" . | nindent 8 }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} {{- end }} {{- with .Values.affinity }} affinity: {{- toYaml . | nindent 8 }} {{- end }} {{- with .Values.tolerations }} tolerations: {{- toYaml . | nindent 8 }} {{- end }}