From 5066dc650ff781f3e34b2bf9be48c544816ccd84 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Fri, 17 Jan 2025 20:27:32 +0330 Subject: [PATCH] feat: add start and trigger savepoint routes --- internal/crd/finalizer.go | 64 ++++++++++--------- internal/managed_job/savepoint.go | 9 +++ internal/rest/controller/crd.go | 103 +++++++++++++++++++++++++++--- internal/rest/router.go | 17 +++++ 4 files changed, 155 insertions(+), 38 deletions(-) diff --git a/internal/crd/finalizer.go b/internal/crd/finalizer.go index 517ae55..9b79b3f 100644 --- a/internal/crd/finalizer.go +++ b/internal/crd/finalizer.go @@ -17,40 +17,44 @@ func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) { finalizerName := "flink-operator.logicamp.tech/finalizer" for j := range jobEventObservable.Observe() { - jobEvent := j.V.(*FlinkJobCrdEvent) + go func() { + + jobEvent := j.V.(*FlinkJobCrdEvent) + + if jobEvent.Job.GetDeletionTimestamp() != nil { + // Resource is being deleted + if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { + // Perform cleanup + pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName())) + if err := crd.cleanupResources(jobEvent.Job); err != nil { + pkg.Logger.Error("[crd] [manage-finalizer] cleanup failed", zap.Error(err)) + return + } + + // Remove finalizer + controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName) + if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil { + pkg.Logger.Error("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err)) + return + } + pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName())) - if jobEvent.Job.GetDeletionTimestamp() != nil { - // Resource is being deleted - if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { - // Perform cleanup - pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName())) - if err := crd.cleanupResources(jobEvent.Job); err != nil { - pkg.Logger.Error("[crd] [manage-finalizer] cleanup failed", zap.Error(err)) - return } - - // Remove finalizer - controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName) - if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil { - pkg.Logger.Error("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err)) - return - } - pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName())) - - } - return - } - - // Add finalizer if not present - if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { - controllerutil.AddFinalizer(jobEvent.Job, finalizerName) - pkg.Logger.Debug("[finalizer] adding job") - // Update the resource to add the finalizer - if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil { - pkg.Logger.Error("[finalizer] failed to add", zap.Error(err)) return } - } + + // Add finalizer if not present + if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) { + controllerutil.AddFinalizer(jobEvent.Job, finalizerName) + pkg.Logger.Debug("[finalizer] adding job") + // Update the resource to add the finalizer + if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil { + pkg.Logger.Error("[finalizer] failed to add", zap.Error(err)) + return + } + } + + }() } } diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index bc4beac..ce984f6 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -80,3 +80,12 @@ func (job ManagedJob) trackSavepoint() error { return nil } + +func (job ManagedJob) TriggerSavepoint() error { + err := job.createSavepoint() + if err != nil { + return err + } + err = job.trackSavepoint() + return err +} diff --git a/internal/rest/controller/crd.go b/internal/rest/controller/crd.go index 49b7871..17f1f60 100644 --- a/internal/rest/controller/crd.go +++ b/internal/rest/controller/crd.go @@ -1,6 +1,7 @@ package controller import ( + "archive/zip" "bufio" "context" "errors" @@ -12,7 +13,6 @@ import ( "fmt" "io" "os" - "os/exec" "path/filepath" "github.com/danielgtaylor/huma/v2" @@ -62,6 +62,18 @@ func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { } func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { + mgr := managed_job.GetManager() + job := mgr.GetJob(k8sTypes.UID(req.JobUId)) + err := job.Run(false) + if err != nil { + return nil, err + } + return &StopJobResp{Body: StopJobRespBody{ + Success: true, + }}, nil +} + +func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.Run(true) @@ -91,6 +103,30 @@ func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { }}, nil } +type JobTriggerSavepointReq struct { + JobUId string `path:"uid"` +} + +type JobTriggerSavepointRespBody struct { + Success bool `json:"success"` +} + +type JobTriggerSavepointResp struct { + Body JobTriggerSavepointRespBody +} + +func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) { + mgr := managed_job.GetManager() + job := mgr.GetJob(k8sTypes.UID(req.JobUId)) + err := job.TriggerSavepoint() + if err != nil { + return nil, err + } + return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{ + Success: true, + }}, nil +} + func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) { folderPath := req.SavepointPath // Change this to your folder path @@ -102,15 +138,66 @@ func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*h zap.String("folderPath", folderPath), ) - // Run the zip command - cmd := exec.Command("zip", "-r", zipFilePath, folderPath) - - // Capture any output or errors - output, err := cmd.CombinedOutput() + // Create the zip file + zipFile, err := os.Create(zipFilePath) if err != nil { - return nil, fmt.Errorf("failed to run zip command: %v\nOutput: %s", err, string(output)) + fmt.Println("Error creating zip file:", err) + return nil, nil } - pkg.Logger.Debug("[controller] [savepoint]", zap.Any("zip command output", string(output))) + defer zipFile.Close() + + // Create a new zip writer + zipWriter := zip.NewWriter(zipFile) + defer zipWriter.Close() + + // Walk through the source directory and add files to the zip + err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // Create a new file header + header, err := zip.FileInfoHeader(info) + if err != nil { + return err + } + + // Set the header name to the relative path + header.Name, err = filepath.Rel(folderPath, filePath) + if err != nil { + return err + } + + // If it's a directory, add a trailing slash + if info.IsDir() { + header.Name += "/" + } else { + // Set the compression method + header.Method = zip.Deflate + } + + // Create a new writer for the file + writer, err := zipWriter.CreateHeader(header) + if err != nil { + return err + } + + // If it's a directory, we're done + if info.IsDir() { + return nil + } + + // Open the file to be zipped + file, err := os.Open(filePath) + if err != nil { + return err + } + defer file.Close() + + // Copy the file content to the zip writer + _, err = io.Copy(writer, file) + return err + }) // Open the zip file for reading zipFileReader, err := os.Open(zipFilePath) diff --git a/internal/rest/router.go b/internal/rest/router.go index 5c52067..695a54e 100644 --- a/internal/rest/router.go +++ b/internal/rest/router.go @@ -35,6 +35,15 @@ func initRouter(api huma.API) { Tags: []string{"Job"}, }, controller.StartJob) + huma.Register(api, huma.Operation{ + OperationID: "resume-job", + Method: http.MethodPost, + Path: "/jobs/{uid}/resume", + Summary: "Resume Job", + Description: "Resume Flink Job", + Tags: []string{"Job"}, + }, controller.ResumeJob) + huma.Register(api, huma.Operation{ OperationID: "remove-jar", Method: http.MethodPost, @@ -62,4 +71,12 @@ func initRouter(api huma.API) { Tags: []string{"Savepoint"}, }, controller.DownloadSavepoint) + huma.Register(api, huma.Operation{ + OperationID: "trigger-savepoint", + Method: http.MethodPost, + Path: "/jobs/{uid}/trigger-savepoint", + Summary: "Trigger Savepoint", + Description: "Trigger Savepoint", + Tags: []string{"Savepoint"}, + }, controller.TriggerSavepoint) }