254 lines
5.7 KiB
Go

package controller
import (
"archive/zip"
"bufio"
"context"
"errors"
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/managed_job"
"flink-kube-operator/internal/rest/types"
"flink-kube-operator/pkg"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/danielgtaylor/huma/v2"
"go.uber.org/zap"
k8sTypes "k8s.io/apimachinery/pkg/types"
)
type GetJobsReq struct {
}
type GetJobsResp struct {
Body []v1alpha1.FlinkJob
}
func GetJobs(ctx context.Context, req *GetJobsReq) (*GetJobsResp, error) {
jobs := []v1alpha1.FlinkJob{}
for _, key := range crd.GetAllJobKeys() {
job := crd.GetJob(key)
job.ManagedFields = nil
jobs = append(jobs, job)
}
return &GetJobsResp{Body: jobs}, nil
}
type StopJobReq struct {
JobUId string `path:"uid"`
}
type StopJobRespBody struct {
Success bool `json:"success"`
}
type StopJobResp struct {
Body StopJobRespBody
}
func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Stop()
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(false)
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(true)
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
job.RemoveJar()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
job.Pause()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
type JobTriggerSavepointReq struct {
JobUId string `path:"uid"`
}
type JobTriggerSavepointRespBody struct {
Success bool `json:"success"`
}
type JobTriggerSavepointResp struct {
Body JobTriggerSavepointRespBody
}
func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.TriggerSavepoint()
if err != nil {
return nil, err
}
return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{
Success: true,
}}, nil
}
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUID))
lastSavepointPath := job.GetLastSavepointPath()
if lastSavepointPath == nil {
return nil, huma.Error404NotFound("there is no savepoint path is registered for the job")
}
folderPath := strings.TrimLeft(*lastSavepointPath, "file:") // Change this to your folder path
// Create a temporary zip file
zipFilePath, err := filepath.Abs("./savepoint.zip")
pkg.Logger.Debug("[controller] [savepoint]",
zap.String("zipFileName", zipFilePath),
zap.String("folderPath", folderPath),
)
// Create the zip file
zipFile, err := os.Create(zipFilePath)
if err != nil {
fmt.Println("Error creating zip file:", err)
return nil, nil
}
defer zipFile.Close()
// Create a new zip writer
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()
// Walk through the source directory and add files to the zip
err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Create a new file header
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
// Set the header name to the relative path
header.Name, err = filepath.Rel(folderPath, filePath)
if err != nil {
return err
}
// If it's a directory, add a trailing slash
if info.IsDir() {
header.Name += "/"
} else {
// Set the compression method
header.Method = zip.Deflate
}
// Create a new writer for the file
writer, err := zipWriter.CreateHeader(header)
if err != nil {
return err
}
// If it's a directory, we're done
if info.IsDir() {
return nil
}
// Open the file to be zipped
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// Copy the file content to the zip writer
_, err = io.Copy(writer, file)
return err
})
// Open the zip file for reading
zipFileReader, err := os.Open(zipFilePath)
if err != nil {
return nil, fmt.Errorf("failed to open zip file: %w", err)
}
//defer zipFileReader.Close()
fileInfo, err := zipFileReader.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get info of zipped file: %w", err)
}
resp := &huma.StreamResponse{
Body: func(ctx huma.Context) {
ctx.SetHeader("Content-Type", "application/zip")
ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath))
writer := ctx.BodyWriter()
br := bufio.NewReader(zipFileReader)
for {
b, err := br.ReadByte()
if err != nil && !errors.Is(err, io.EOF) {
fmt.Println(err)
break
}
// process the one byte b
if err != nil {
// end of file
break
}
writer.Write([]byte{b})
}
os.Remove(zipFilePath)
},
}
return resp, nil
}