package controller import ( "archive/zip" "bufio" "context" "errors" "flink-kube-operator/internal/crd" "flink-kube-operator/internal/crd/v1alpha1" "flink-kube-operator/internal/managed_job" "flink-kube-operator/internal/rest/types" "flink-kube-operator/pkg" "fmt" "io" "os" "path/filepath" "github.com/danielgtaylor/huma/v2" "go.uber.org/zap" k8sTypes "k8s.io/apimachinery/pkg/types" ) type GetJobsReq struct { } type GetJobsResp struct { Body []v1alpha1.FlinkJob } func GetJobs(ctx context.Context, req *GetJobsReq) (*GetJobsResp, error) { jobs := []v1alpha1.FlinkJob{} for _, key := range crd.GetAllJobKeys() { job := crd.GetJob(key) job.ManagedFields = nil jobs = append(jobs, job) } return &GetJobsResp{Body: jobs}, nil } type StopJobReq struct { JobUId string `path:"uid"` } type StopJobRespBody struct { Success bool `json:"success"` } type StopJobResp struct { Body StopJobRespBody } func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.Stop() if err != nil { return nil, err } return &StopJobResp{Body: StopJobRespBody{ Success: true, }}, nil } func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.Run(false) if err != nil { return nil, err } return &StopJobResp{Body: StopJobRespBody{ Success: true, }}, nil } func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.Run(true) if err != nil { return nil, err } return &StopJobResp{Body: StopJobRespBody{ Success: true, }}, nil } func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) job.RemoveJar() return &StopJobResp{Body: StopJobRespBody{ Success: true, }}, nil } func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) job.Pause() return &StopJobResp{Body: StopJobRespBody{ Success: true, }}, nil } type JobTriggerSavepointReq struct { JobUId string `path:"uid"` } type JobTriggerSavepointRespBody struct { Success bool `json:"success"` } type JobTriggerSavepointResp struct { Body JobTriggerSavepointRespBody } func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) { mgr := managed_job.GetManager() job := mgr.GetJob(k8sTypes.UID(req.JobUId)) err := job.TriggerSavepoint() if err != nil { return nil, err } return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{ Success: true, }}, nil } func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) { folderPath := req.SavepointPath // Change this to your folder path // Create a temporary zip file zipFilePath, err := filepath.Abs("./savepoint.zip") pkg.Logger.Debug("[controller] [savepoint]", zap.String("zipFileName", zipFilePath), zap.String("folderPath", folderPath), ) // Create the zip file zipFile, err := os.Create(zipFilePath) if err != nil { fmt.Println("Error creating zip file:", err) return nil, nil } defer zipFile.Close() // Create a new zip writer zipWriter := zip.NewWriter(zipFile) defer zipWriter.Close() // Walk through the source directory and add files to the zip err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error { if err != nil { return err } // Create a new file header header, err := zip.FileInfoHeader(info) if err != nil { return err } // Set the header name to the relative path header.Name, err = filepath.Rel(folderPath, filePath) if err != nil { return err } // If it's a directory, add a trailing slash if info.IsDir() { header.Name += "/" } else { // Set the compression method header.Method = zip.Deflate } // Create a new writer for the file writer, err := zipWriter.CreateHeader(header) if err != nil { return err } // If it's a directory, we're done if info.IsDir() { return nil } // Open the file to be zipped file, err := os.Open(filePath) if err != nil { return err } defer file.Close() // Copy the file content to the zip writer _, err = io.Copy(writer, file) return err }) // Open the zip file for reading zipFileReader, err := os.Open(zipFilePath) if err != nil { return nil, fmt.Errorf("failed to open zip file: %w", err) } //defer zipFileReader.Close() fileInfo, err := zipFileReader.Stat() if err != nil { return nil, fmt.Errorf("failed to get info of zipped file: %w", err) } resp := &huma.StreamResponse{ Body: func(ctx huma.Context) { ctx.SetHeader("Content-Type", "application/zip") ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size())) ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath)) writer := ctx.BodyWriter() br := bufio.NewReader(zipFileReader) for { b, err := br.ReadByte() if err != nil && !errors.Is(err, io.EOF) { fmt.Println(err) break } // process the one byte b if err != nil { // end of file break } writer.Write([]byte{b}) } os.Remove(zipFilePath) }, } return resp, nil }