package jar import ( "crypto/rand" "encoding/base64" "encoding/hex" "errors" "io" "net/http" "net/http/cookiejar" "os" "strings" "flink-kube-operator/pkg" api "github.com/logi-camp/go-flink-client" "go.uber.org/zap" ) type JarFile struct { uri string filePath string basicAuthUsername *string basicAuthPassword *string } func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) { jarFile := &JarFile{ uri: URI, basicAuthUsername: basicAuthUsername, basicAuthPassword: basicAuthPassword, } err := jarFile.Download() if err != nil { return nil, err } return jarFile, nil } func (jarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err error) { resp, err := flinkClient.UploadJar(jarFile.filePath) if err != nil { pkg.Logger.Error("[main] error uploading jar", zap.Error(err)) } filePathParts := strings.Split(resp.FileName, "/") fileName = filePathParts[len(filePathParts)-1] if resp.Status != "success" { err = errors.New("jar upload was not success") } jarFile.delete() return } func (jarFile *JarFile) Download() error { randBytes := make([]byte, 16) rand.Read(randBytes) fileName := hex.EncodeToString(randBytes) jarFile.filePath = "/tmp/" + fileName + ".jar" out, err := os.Create(jarFile.filePath) if err != nil { return err } defer out.Close() var resp *http.Response if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil { basicAuth := func(username, password string) string { auth := username + ":" + password return base64.StdEncoding.EncodeToString([]byte(auth)) } redirectPolicyFunc := func(req *http.Request, via []*http.Request) error { req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword)) return nil } client := &http.Client{ Jar: &cookiejar.Jar{}, CheckRedirect: redirectPolicyFunc, } req, err := http.NewRequest("GET", jarFile.uri, nil) if err != nil { jarFile.delete() return err } req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword)) resp, err = client.Do(req) } else { resp, err = http.Get(jarFile.uri) } if err != nil { jarFile.delete() pkg.Logger.Error("error in downloading jar", zap.Error(err)) return err } if resp.StatusCode > 299 { respBody := []byte{} resp.Body.Read(respBody) err = errors.New(string(respBody) + " status:" + resp.Status) pkg.Logger.Error("error in downloading jar", zap.Error(err)) return err } defer resp.Body.Close() _, err = io.Copy(out, resp.Body) if err != nil { return err } return nil } func (jarFile *JarFile) delete() error { pkg.Logger.Info("[jar] [delete]", zap.String("path", jarFile.filePath)) err := os.Remove(jarFile.filePath) if err != nil { pkg.Logger.Error("[jar] [delete]", zap.Error(err)) } return err }