Mohammadreza Khani 830e265162 feat: apply new helm structure
use minio s3 for savepoint and checkpoint path
separate task-manager, job-manager and operator
use statefulset for task-manager to handle replication
support basic credential for download jar request
update to flink 1.20.1
2025-04-05 01:39:02 +03:30

124 lines
2.9 KiB
Go

package jar
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"net/http"
"net/http/cookiejar"
"os"
"strings"
"flink-kube-operator/pkg"
api "github.com/logi-camp/go-flink-client"
"go.uber.org/zap"
)
type JarFile struct {
uri string
filePath string
basicAuthUsername *string
basicAuthPassword *string
}
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
jarFile := &JarFile{
uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
}
err := jarFile.Download()
if err != nil {
return nil, err
}
return jarFile, nil
}
func (jarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err error) {
resp, err := flinkClient.UploadJar(jarFile.filePath)
if err != nil {
pkg.Logger.Error("[main] error uploading jar", zap.Error(err))
}
filePathParts := strings.Split(resp.FileName, "/")
fileName = filePathParts[len(filePathParts)-1]
if resp.Status != "success" {
err = errors.New("jar upload was not success")
}
jarFile.delete()
return
}
func (jarFile *JarFile) Download() error {
randBytes := make([]byte, 16)
rand.Read(randBytes)
fileName := hex.EncodeToString(randBytes)
jarFile.filePath = "/tmp/" + fileName + ".jar"
out, err := os.Create(jarFile.filePath)
if err != nil {
return err
}
defer out.Close()
var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
defer resp.Body.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return nil
}
func (jarFile *JarFile) delete() error {
pkg.Logger.Info("[jar] [delete]", zap.String("path", jarFile.filePath))
err := os.Remove(jarFile.filePath)
if err != nil {
pkg.Logger.Error("[jar] [delete]", zap.Error(err))
}
return err
}