From c977c8a15d6033db7cd6ec9708acfe24e224d450 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Fri, 17 Jan 2025 18:34:06 +0330 Subject: [PATCH] feat: add download savepoint route --- helm/chart/templates/flink/deploy.yaml | 36 +++++++- helm/chart/templates/flink/service.yaml | 4 + helm/chart/templates/operator/deployment.yaml | 63 -------------- internal/rest/base.go | 18 ++++ internal/rest/controller/crd.go | 87 +++++++++++++++++-- internal/rest/router.go | 10 +++ internal/rest/types/savepoint.go | 12 +++ 7 files changed, 159 insertions(+), 71 deletions(-) create mode 100644 internal/rest/types/savepoint.go diff --git a/helm/chart/templates/flink/deploy.yaml b/helm/chart/templates/flink/deploy.yaml index 1448c1f..ddb5459 100644 --- a/helm/chart/templates/flink/deploy.yaml +++ b/helm/chart/templates/flink/deploy.yaml @@ -77,6 +77,7 @@ spec: app.kubernetes.io/name: {{ .Release.Name }}-flink app.kubernetes.io/instance: {{ .Release.Name }} spec: + serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} initContainers: - name: volume-mount-hack image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} @@ -84,7 +85,6 @@ spec: command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"] volumeMounts: {{- include "flink.volumeMounts" . | nindent 12 }} - serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} containers: - name: jobmanager image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} @@ -117,6 +117,36 @@ spec: {{- include "flink.env" . | nindent 12 }} volumeMounts: {{- include "flink.volumeMounts" . | nindent 12 }} - + - name: operator + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + ports: + - name: http + containerPort: {{ .Values.service.port }} + protocol: TCP + env: + - name: FLINK_API_URL + value: localhost:8081 + - name: SAVEPOINT_PATH + value: file://{{ .Values.flink.state.savepoints.dir }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + volumeMounts: + {{- include "flink.volumeMounts" . | nindent 12 }} volumes: - {{- include "flink.volumes" . | nindent 8 }} \ No newline at end of file + {{- include "flink.volumes" . | nindent 8 }} + + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/helm/chart/templates/flink/service.yaml b/helm/chart/templates/flink/service.yaml index 9e2b0fd..5a7cc1b 100644 --- a/helm/chart/templates/flink/service.yaml +++ b/helm/chart/templates/flink/service.yaml @@ -8,7 +8,11 @@ metadata: spec: ports: - port: 8081 + name: flink-web-ui targetPort: 8081 + - port: 3000 + name: operator + targetPort: 3000 selector: app.kubernetes.io/name: {{ .Release.Name }}-flink app.kubernetes.io/instance: {{ .Release.Name }} diff --git a/helm/chart/templates/operator/deployment.yaml b/helm/chart/templates/operator/deployment.yaml index 3f90cc5..e69de29 100644 --- a/helm/chart/templates/operator/deployment.yaml +++ b/helm/chart/templates/operator/deployment.yaml @@ -1,63 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ include "flink-kube-operator.fullname" . }} - labels: - {{- include "flink-kube-operator.labels" . | nindent 4 }} -spec: - {{- if not .Values.autoscaling.enabled }} - replicas: {{ .Values.replicaCount }} - {{- end }} - selector: - matchLabels: - {{- include "flink-kube-operator.selectorLabels" . | nindent 6 }} - template: - metadata: - {{- with .Values.podAnnotations }} - annotations: - {{- toYaml . | nindent 8 }} - {{- end }} - labels: - {{- include "flink-kube-operator.labels" . | nindent 8 }} - {{- with .Values.podLabels }} - {{- toYaml . | nindent 8 }} - {{- end }} - spec: - {{- with .Values.imagePullSecrets }} - imagePullSecrets: - {{- toYaml . | nindent 8 }} - {{- end }} - serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} - securityContext: - {{- toYaml .Values.podSecurityContext | nindent 8 }} - containers: - - name: {{ .Chart.Name }} - securityContext: - {{- toYaml .Values.securityContext | nindent 12 }} - image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - ports: - - name: http - containerPort: {{ .Values.service.port }} - protocol: TCP - env: - - name: FLINK_API_URL - value: {{ .Values.config.flinkApiUrl }} - - name: SAVEPOINT_PATH - value: file://{{ .Values.flink.state.savepoints.dir }} - resources: - {{- toYaml .Values.resources | nindent 12 }} - - - {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} diff --git a/internal/rest/base.go b/internal/rest/base.go index 6b24472..d8c0bd3 100644 --- a/internal/rest/base.go +++ b/internal/rest/base.go @@ -11,6 +11,18 @@ import ( func Init() { app := fiber.New() + app.Use(func(c *fiber.Ctx) error { + // Logic to execute before the next handler + fmt.Printf("Request Method: %s, URL: %s\n", c.Method(), c.OriginalURL()) + + // Call the next handler in the stack + err := c.Next() + + // Logic to execute after the next handler + fmt.Println("Request completed") + + return err + }) config := huma.DefaultConfig("Go API", "1.0.0") config.Servers = []*huma.Server{{}} config.Components.SecuritySchemes = map[string]*huma.SecurityScheme{ @@ -21,6 +33,12 @@ func Init() { }, } api := humaFiber.New(app, config) + api.UseMiddleware( + func(ctx huma.Context, next func(huma.Context)) { + ctx = huma.WithValue(ctx, "humaContext", ctx) + next(ctx) + }, + ) initRouter(api) diff --git a/internal/rest/controller/crd.go b/internal/rest/controller/crd.go index 3dca138..49b7871 100644 --- a/internal/rest/controller/crd.go +++ b/internal/rest/controller/crd.go @@ -1,12 +1,23 @@ package controller import ( + "bufio" "context" + "errors" "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/internal/managed_job" + "flink-kube-operator/internal/rest/types" + "flink-kube-operator/pkg" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" - "k8s.io/apimachinery/pkg/types" + "github.com/danielgtaylor/huma/v2" + "go.uber.org/zap" + k8sTypes "k8s.io/apimachinery/pkg/types" ) type GetJobsReq struct { @@ -40,7 +51,7 @@ type StopJobResp struct { func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() - job := mgr.GetJob(types.UID(req.JobUId)) + job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.Stop() if err != nil { return nil, err @@ -52,7 +63,7 @@ func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() - job := mgr.GetJob(types.UID(req.JobUId)) + job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.Run(true) if err != nil { return nil, err @@ -64,7 +75,7 @@ func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() - job := mgr.GetJob(types.UID(req.JobUId)) + job := mgr.GetJob(k8sTypes.UID(req.JobUId)) job.RemoveJar() return &StopJobResp{Body: StopJobRespBody{ Success: true, @@ -73,9 +84,75 @@ func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() - job := mgr.GetJob(types.UID(req.JobUId)) + job := mgr.GetJob(k8sTypes.UID(req.JobUId)) job.Pause() return &StopJobResp{Body: StopJobRespBody{ Success: true, }}, nil } + +func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) { + folderPath := req.SavepointPath // Change this to your folder path + + // Create a temporary zip file + zipFilePath, err := filepath.Abs("./savepoint.zip") + + pkg.Logger.Debug("[controller] [savepoint]", + zap.String("zipFileName", zipFilePath), + zap.String("folderPath", folderPath), + ) + + // Run the zip command + cmd := exec.Command("zip", "-r", zipFilePath, folderPath) + + // Capture any output or errors + output, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to run zip command: %v\nOutput: %s", err, string(output)) + } + pkg.Logger.Debug("[controller] [savepoint]", zap.Any("zip command output", string(output))) + + // Open the zip file for reading + zipFileReader, err := os.Open(zipFilePath) + if err != nil { + return nil, fmt.Errorf("failed to open zip file: %w", err) + } + //defer zipFileReader.Close() + + fileInfo, err := zipFileReader.Stat() + if err != nil { + return nil, fmt.Errorf("failed to get info of zipped file: %w", err) + } + + resp := &huma.StreamResponse{ + Body: func(ctx huma.Context) { + ctx.SetHeader("Content-Type", "application/zip") + ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size())) + ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath)) + writer := ctx.BodyWriter() + br := bufio.NewReader(zipFileReader) + for { + + b, err := br.ReadByte() + + if err != nil && !errors.Is(err, io.EOF) { + fmt.Println(err) + break + } + + // process the one byte b + + if err != nil { + // end of file + break + } + writer.Write([]byte{b}) + } + + os.Remove(zipFilePath) + + }, + } + + return resp, nil +} diff --git a/internal/rest/router.go b/internal/rest/router.go index a63da90..5c52067 100644 --- a/internal/rest/router.go +++ b/internal/rest/router.go @@ -52,4 +52,14 @@ func initRouter(api huma.API) { Description: "Pause Flink Job", Tags: []string{"Job"}, }, controller.PauseJob) + + huma.Register(api, huma.Operation{ + OperationID: "download-savepoint", + Method: http.MethodGet, + Path: "/savepoint/download", + Summary: "Download Savepoint", + Description: "Download Savepoint", + Tags: []string{"Savepoint"}, + }, controller.DownloadSavepoint) + } diff --git a/internal/rest/types/savepoint.go b/internal/rest/types/savepoint.go new file mode 100644 index 0000000..a6416cb --- /dev/null +++ b/internal/rest/types/savepoint.go @@ -0,0 +1,12 @@ +package types + +type SavepointDownloadReq struct { + SavepointPath string `query:"savepoint-path"` +} + +type SavepointDownloadResp struct { + ContentType string `header:"Content-Type"` + ContentDisposition string `header:"Content-Disposition"` + ContentLength string `header:"Content-Length"` + Body []byte +}