feat: add start and trigger savepoint routes

This commit is contained in:
Mohamad Khani 2025-01-17 20:27:32 +03:30
parent c977c8a15d
commit 5066dc650f
4 changed files with 155 additions and 38 deletions

View File

@ -17,40 +17,44 @@ func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
finalizerName := "flink-operator.logicamp.tech/finalizer" finalizerName := "flink-operator.logicamp.tech/finalizer"
for j := range jobEventObservable.Observe() { for j := range jobEventObservable.Observe() {
jobEvent := j.V.(*FlinkJobCrdEvent) go func() {
jobEvent := j.V.(*FlinkJobCrdEvent)
if jobEvent.Job.GetDeletionTimestamp() != nil {
// Resource is being deleted
if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
// Perform cleanup
pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName()))
if err := crd.cleanupResources(jobEvent.Job); err != nil {
pkg.Logger.Error("[crd] [manage-finalizer] cleanup failed", zap.Error(err))
return
}
// Remove finalizer
controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName)
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
pkg.Logger.Error("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err))
return
}
pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName()))
if jobEvent.Job.GetDeletionTimestamp() != nil {
// Resource is being deleted
if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
// Perform cleanup
pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName()))
if err := crd.cleanupResources(jobEvent.Job); err != nil {
pkg.Logger.Error("[crd] [manage-finalizer] cleanup failed", zap.Error(err))
return
} }
// Remove finalizer
controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName)
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
pkg.Logger.Error("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err))
return
}
pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName()))
}
return
}
// Add finalizer if not present
if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
controllerutil.AddFinalizer(jobEvent.Job, finalizerName)
pkg.Logger.Debug("[finalizer] adding job")
// Update the resource to add the finalizer
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
pkg.Logger.Error("[finalizer] failed to add", zap.Error(err))
return return
} }
}
// Add finalizer if not present
if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
controllerutil.AddFinalizer(jobEvent.Job, finalizerName)
pkg.Logger.Debug("[finalizer] adding job")
// Update the resource to add the finalizer
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
pkg.Logger.Error("[finalizer] failed to add", zap.Error(err))
return
}
}
}()
} }
} }

View File

@ -80,3 +80,12 @@ func (job ManagedJob) trackSavepoint() error {
return nil return nil
} }
func (job ManagedJob) TriggerSavepoint() error {
err := job.createSavepoint()
if err != nil {
return err
}
err = job.trackSavepoint()
return err
}

View File

@ -1,6 +1,7 @@
package controller package controller
import ( import (
"archive/zip"
"bufio" "bufio"
"context" "context"
"errors" "errors"
@ -12,7 +13,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"github.com/danielgtaylor/huma/v2" "github.com/danielgtaylor/huma/v2"
@ -62,6 +62,18 @@ func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
} }
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(false)
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager() mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId)) job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(true) err := job.Run(true)
@ -91,6 +103,30 @@ func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
}}, nil }}, nil
} }
type JobTriggerSavepointReq struct {
JobUId string `path:"uid"`
}
type JobTriggerSavepointRespBody struct {
Success bool `json:"success"`
}
type JobTriggerSavepointResp struct {
Body JobTriggerSavepointRespBody
}
func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.TriggerSavepoint()
if err != nil {
return nil, err
}
return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{
Success: true,
}}, nil
}
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) { func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
folderPath := req.SavepointPath // Change this to your folder path folderPath := req.SavepointPath // Change this to your folder path
@ -102,15 +138,66 @@ func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*h
zap.String("folderPath", folderPath), zap.String("folderPath", folderPath),
) )
// Run the zip command // Create the zip file
cmd := exec.Command("zip", "-r", zipFilePath, folderPath) zipFile, err := os.Create(zipFilePath)
// Capture any output or errors
output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to run zip command: %v\nOutput: %s", err, string(output)) fmt.Println("Error creating zip file:", err)
return nil, nil
} }
pkg.Logger.Debug("[controller] [savepoint]", zap.Any("zip command output", string(output))) defer zipFile.Close()
// Create a new zip writer
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()
// Walk through the source directory and add files to the zip
err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Create a new file header
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
// Set the header name to the relative path
header.Name, err = filepath.Rel(folderPath, filePath)
if err != nil {
return err
}
// If it's a directory, add a trailing slash
if info.IsDir() {
header.Name += "/"
} else {
// Set the compression method
header.Method = zip.Deflate
}
// Create a new writer for the file
writer, err := zipWriter.CreateHeader(header)
if err != nil {
return err
}
// If it's a directory, we're done
if info.IsDir() {
return nil
}
// Open the file to be zipped
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// Copy the file content to the zip writer
_, err = io.Copy(writer, file)
return err
})
// Open the zip file for reading // Open the zip file for reading
zipFileReader, err := os.Open(zipFilePath) zipFileReader, err := os.Open(zipFilePath)

View File

@ -35,6 +35,15 @@ func initRouter(api huma.API) {
Tags: []string{"Job"}, Tags: []string{"Job"},
}, controller.StartJob) }, controller.StartJob)
huma.Register(api, huma.Operation{
OperationID: "resume-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/resume",
Summary: "Resume Job",
Description: "Resume Flink Job",
Tags: []string{"Job"},
}, controller.ResumeJob)
huma.Register(api, huma.Operation{ huma.Register(api, huma.Operation{
OperationID: "remove-jar", OperationID: "remove-jar",
Method: http.MethodPost, Method: http.MethodPost,
@ -62,4 +71,12 @@ func initRouter(api huma.API) {
Tags: []string{"Savepoint"}, Tags: []string{"Savepoint"},
}, controller.DownloadSavepoint) }, controller.DownloadSavepoint)
huma.Register(api, huma.Operation{
OperationID: "trigger-savepoint",
Method: http.MethodPost,
Path: "/jobs/{uid}/trigger-savepoint",
Summary: "Trigger Savepoint",
Description: "Trigger Savepoint",
Tags: []string{"Savepoint"},
}, controller.TriggerSavepoint)
} }