feat: add download savepoint route

This commit is contained in:
Mohamad Khani 2025-01-17 18:34:06 +03:30
parent ef7b16af68
commit c977c8a15d
7 changed files with 159 additions and 71 deletions

View File

@ -77,6 +77,7 @@ spec:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
@ -84,7 +85,6 @@ spec:
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"]
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
@ -117,6 +117,36 @@ spec:
{{- include "flink.env" . | nindent 12 }}
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: localhost:8081
- name: SAVEPOINT_PATH
value: file://{{ .Values.flink.state.savepoints.dir }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
volumes:
{{- include "flink.volumes" . | nindent 8 }}
{{- include "flink.volumes" . | nindent 8 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -8,7 +8,11 @@ metadata:
spec:
ports:
- port: 8081
name: flink-web-ui
targetPort: 8081
- port: 3000
name: operator
targetPort: 3000
selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}

View File

@ -1,63 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "flink-kube-operator.fullname" . }}
labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "flink-kube-operator.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "flink-kube-operator.labels" . | nindent 8 }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: {{ .Values.config.flinkApiUrl }}
- name: SAVEPOINT_PATH
value: file://{{ .Values.flink.state.savepoints.dir }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -11,6 +11,18 @@ import (
func Init() {
app := fiber.New()
app.Use(func(c *fiber.Ctx) error {
// Logic to execute before the next handler
fmt.Printf("Request Method: %s, URL: %s\n", c.Method(), c.OriginalURL())
// Call the next handler in the stack
err := c.Next()
// Logic to execute after the next handler
fmt.Println("Request completed")
return err
})
config := huma.DefaultConfig("Go API", "1.0.0")
config.Servers = []*huma.Server{{}}
config.Components.SecuritySchemes = map[string]*huma.SecurityScheme{
@ -21,6 +33,12 @@ func Init() {
},
}
api := humaFiber.New(app, config)
api.UseMiddleware(
func(ctx huma.Context, next func(huma.Context)) {
ctx = huma.WithValue(ctx, "humaContext", ctx)
next(ctx)
},
)
initRouter(api)

View File

@ -1,12 +1,23 @@
package controller
import (
"bufio"
"context"
"errors"
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/managed_job"
"flink-kube-operator/internal/rest/types"
"flink-kube-operator/pkg"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"k8s.io/apimachinery/pkg/types"
"github.com/danielgtaylor/huma/v2"
"go.uber.org/zap"
k8sTypes "k8s.io/apimachinery/pkg/types"
)
type GetJobsReq struct {
@ -40,7 +51,7 @@ type StopJobResp struct {
func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Stop()
if err != nil {
return nil, err
@ -52,7 +63,7 @@ func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(true)
if err != nil {
return nil, err
@ -64,7 +75,7 @@ func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
job.RemoveJar()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
@ -73,9 +84,75 @@ func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
job.Pause()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
folderPath := req.SavepointPath // Change this to your folder path
// Create a temporary zip file
zipFilePath, err := filepath.Abs("./savepoint.zip")
pkg.Logger.Debug("[controller] [savepoint]",
zap.String("zipFileName", zipFilePath),
zap.String("folderPath", folderPath),
)
// Run the zip command
cmd := exec.Command("zip", "-r", zipFilePath, folderPath)
// Capture any output or errors
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to run zip command: %v\nOutput: %s", err, string(output))
}
pkg.Logger.Debug("[controller] [savepoint]", zap.Any("zip command output", string(output)))
// Open the zip file for reading
zipFileReader, err := os.Open(zipFilePath)
if err != nil {
return nil, fmt.Errorf("failed to open zip file: %w", err)
}
//defer zipFileReader.Close()
fileInfo, err := zipFileReader.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get info of zipped file: %w", err)
}
resp := &huma.StreamResponse{
Body: func(ctx huma.Context) {
ctx.SetHeader("Content-Type", "application/zip")
ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath))
writer := ctx.BodyWriter()
br := bufio.NewReader(zipFileReader)
for {
b, err := br.ReadByte()
if err != nil && !errors.Is(err, io.EOF) {
fmt.Println(err)
break
}
// process the one byte b
if err != nil {
// end of file
break
}
writer.Write([]byte{b})
}
os.Remove(zipFilePath)
},
}
return resp, nil
}

View File

@ -52,4 +52,14 @@ func initRouter(api huma.API) {
Description: "Pause Flink Job",
Tags: []string{"Job"},
}, controller.PauseJob)
huma.Register(api, huma.Operation{
OperationID: "download-savepoint",
Method: http.MethodGet,
Path: "/savepoint/download",
Summary: "Download Savepoint",
Description: "Download Savepoint",
Tags: []string{"Savepoint"},
}, controller.DownloadSavepoint)
}

View File

@ -0,0 +1,12 @@
package types
type SavepointDownloadReq struct {
SavepointPath string `query:"savepoint-path"`
}
type SavepointDownloadResp struct {
ContentType string `header:"Content-Type"`
ContentDisposition string `header:"Content-Disposition"`
ContentLength string `header:"Content-Length"`
Body []byte
}