Compare commits

..

2 Commits

6 changed files with 18 additions and 4 deletions

View File

@ -2,5 +2,5 @@ apiVersion: v2
name: flink-kube-operator name: flink-kube-operator
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
type: application type: application
version: 0.1.9 version: 0.1.10
appVersion: "0.1.0" appVersion: "0.1.0"

View File

@ -18,13 +18,15 @@
high-availability.type: kubernetes high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }} kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: cluster-one kubernetes.cluster-id: cluster-one
execution.checkpointing.interval: 5min
execution.checkpointing.mode: EXACTLY_ONCE
web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }} high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }}
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }} state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
state.backend.incremental: false state.backend.incremental: false
rest.profiling.enabled: true rest.profiling.enabled: true
{{- end }} {{- end }}
{{- define "flink.volumeMounts" -}} {{- define "flink.volumeMounts" -}}

Binary file not shown.

View File

@ -89,3 +89,7 @@ func (job ManagedJob) TriggerSavepoint() error {
err = job.trackSavepoint() err = job.trackSavepoint()
return err return err
} }
func (job ManagedJob) GetLastSavepointPath() *string {
return job.def.Status.LastSavepointPath
}

View File

@ -14,6 +14,7 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/danielgtaylor/huma/v2" "github.com/danielgtaylor/huma/v2"
"go.uber.org/zap" "go.uber.org/zap"
@ -128,7 +129,14 @@ func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTri
} }
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) { func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
folderPath := req.SavepointPath // Change this to your folder path mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUID))
lastSavepointPath := job.GetLastSavepointPath()
if lastSavepointPath == nil {
return nil, huma.Error404NotFound("there is no savepoint path is registered for the job")
}
folderPath := strings.TrimLeft(*lastSavepointPath, "file:") // Change this to your folder path
// Create a temporary zip file // Create a temporary zip file
zipFilePath, err := filepath.Abs("./savepoint.zip") zipFilePath, err := filepath.Abs("./savepoint.zip")

View File

@ -1,7 +1,7 @@
package types package types
type SavepointDownloadReq struct { type SavepointDownloadReq struct {
SavepointPath string `query:"savepoint-path"` JobUID string `query:"jobUID"`
} }
type SavepointDownloadResp struct { type SavepointDownloadResp struct {