From 3b0aff56882dc878b182722905416a30430b9d9c Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sat, 30 Nov 2024 01:22:51 +0330 Subject: [PATCH] feat: initialize --- .air.toml | 46 +++++++ .gitignore | 4 + .vscode/settings.json | 5 + cmd/operator/main.go | 69 ++++++++++ config.yaml | 5 + go.mod | 52 +++++++ go.sum | 219 ++++++++++++++++++++++++++++++ internal/config/config.type.go | 12 ++ internal/jar/jar.go | 70 ++++++++++ internal/managed_job/cycle.go | 71 ++++++++++ internal/managed_job/new.go | 26 ++++ internal/managed_job/run.go | 46 +++++++ internal/managed_job/savepoint.go | 20 +++ internal/managed_job/state.go | 55 ++++++++ internal/managed_job/status.go | 32 +++++ internal/managed_job/type.go | 29 ++++ 16 files changed, 761 insertions(+) create mode 100644 .air.toml create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 cmd/operator/main.go create mode 100644 config.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/config/config.type.go create mode 100644 internal/jar/jar.go create mode 100644 internal/managed_job/cycle.go create mode 100644 internal/managed_job/new.go create mode 100644 internal/managed_job/run.go create mode 100644 internal/managed_job/savepoint.go create mode 100644 internal/managed_job/state.go create mode 100644 internal/managed_job/status.go create mode 100644 internal/managed_job/type.go diff --git a/.air.toml b/.air.toml new file mode 100644 index 0000000..e6d7edb --- /dev/null +++ b/.air.toml @@ -0,0 +1,46 @@ +root = "." +testdata_dir = "testdata" +tmp_dir = "tmp" + +[build] + args_bin = [] + bin = "./tmp/main" + cmd = "go build -o ./tmp/main ./cmd/operator" + delay = 1000 + exclude_dir = ["assets", "tmp", "vendor", "testdata"] + exclude_file = [] + exclude_regex = ["_test.go"] + exclude_unchanged = false + follow_symlink = false + full_bin = "" + include_dir = [] + include_ext = ["go", "tpl", "tmpl", "html"] + include_file = [] + kill_delay = "0s" + log = "build-errors.log" + poll = false + poll_interval = 0 + post_cmd = [] + pre_cmd = [] + rerun = false + rerun_delay = 500 + send_interrupt = false + stop_on_error = true + +[color] + app = "" + build = "yellow" + main = "magenta" + runner = "green" + watcher = "cyan" + +[log] + main_only = false + time = false + +[misc] + clean_on_exit = false + +[screen] + clear_on_rebuild = false + keep_scroll = true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1331240 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +db +*.log +tmp +*.jar \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..62cdd69 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "cSpell.words": [ + "flink" + ] +} \ No newline at end of file diff --git a/cmd/operator/main.go b/cmd/operator/main.go new file mode 100644 index 0000000..8bbb233 --- /dev/null +++ b/cmd/operator/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "flink-kube-operator/internal/config" + "flink-kube-operator/internal/managed_job" + "fmt" + "log" + "os" + + "gitea.com/logicamp/lc" + "github.com/dgraph-io/badger/v4" + api "github.com/logi-camp/go-flink-client" + "go.uber.org/zap" +) + +func main() { + lc.Logger.Debug("start") + + db, err := badger.Open(badger.DefaultOptions("./db")) + if err != nil { + lc.Logger.Fatal("[main] error on open db", zap.Error(err)) + } + defer db.Close() + + c, err := api.New("127.0.0.1:8081") + if err != nil { + panic(err) + } + + // get cluster clusterConfig + clusterConfig, err := c.Config() + if err != nil { + panic(err) + } + fmt.Println(clusterConfig) + + jars, err := c.Jars() + if err != nil { + lc.Logger.Error("error on getting jars", zap.Error(err)) + } + lc.Logger.Debug("[main] jars", zap.Any("jars", jars)) + + jobs, err := c.Jobs() + if err != nil { + lc.Logger.Error("error on getting jars", zap.Error(err)) + } + lc.Logger.Debug("[main] jobs", zap.Any("jobs", jobs)) + + config := lc.LoadYamlConfig[config.Config]("./config.yaml") + for _, jobDef := range config.Jobs { + managed_job.NewManagedJob(c, db, jobDef) + } + + for _, job := range jobs.Jobs { + job, err := c.Job(job.ID) + if err != nil { + lc.Logger.Error("error getting job info", zap.Error(err)) + continue + } + if job.State == "RUNNING" { + lc.Logger.Debug("[main] running job", zap.String("jobId", job.ID)) + } + // lc.Logger.Debug("[main]", zap.Any("job", job)) + } + + cancelChan := make(chan os.Signal, 1) + sig := <-cancelChan + log.Printf("Caught signal %v", sig) +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..570cb87 --- /dev/null +++ b/config.yaml @@ -0,0 +1,5 @@ +jobs: + - key: price-processor + name: Price Processor + entryClass: top.bazargam.Main + jarURI: http://price-processor.bz2/price-processor-v0.0.1.jar diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..da28c6d --- /dev/null +++ b/go.mod @@ -0,0 +1,52 @@ +module flink-kube-operator + +go 1.23.2 + +require ( + gitea.com/logicamp/lc v1.14.6 // indirect + github.com/IBM/sarama v1.43.3 // indirect + github.com/arangodb/go-driver v1.6.4 // indirect + github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 // indirect + github.com/danielgtaylor/huma/v2 v2.26.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/badger/v4 v4.5.0 // indirect + github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/logi-camp/go-flink-client v0.0.3 // indirect + github.com/matoous/go-nanoid/v2 v2.1.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/nats-io/nats.go v1.37.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + go.opencensus.io v0.24.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/crypto v0.29.0 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/sys v0.27.0 // indirect + google.golang.org/protobuf v1.35.1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2ea9b6e --- /dev/null +++ b/go.sum @@ -0,0 +1,219 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +gitea.com/logicamp/lc v1.14.6 h1:jgtrYwbqvWU/cxT9mDFCToK7D+tEpGyhV3FnSn3hKBw= +gitea.com/logicamp/lc v1.14.6/go.mod h1:P29QjvOVa1VnEZIFi+Mab7uRbO/qRrbZPdARSpi4qXs= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= +github.com/arangodb/go-driver v1.6.4 h1:tuizDP620e3OFaoFkEh07n4bHSTvCOvm5qVGQ/+zMrs= +github.com/arangodb/go-driver v1.6.4/go.mod h1:oLZCzkEGh82I2XBcMkrDtqsMXFg6QkIRgbqLGalh6jc= +github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g= +github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 h1:VKnMT71Tl0dCp3lfGBp2D8eqQwc+amoDY5EeUgFHDDE= +github.com/confluentinc/confluent-kafka-go/v2 v2.6.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves= +github.com/danielgtaylor/huma/v2 v2.26.0 h1:lON4pIcckuSQJNDi6WkOu0sS7mxvlNkTAGbc3BrRXTc= +github.com/danielgtaylor/huma/v2 v2.26.0/go.mod h1:NbSFXRoOMh3BVmiLJQ9EbUpnPas7D9BeOxF/pZBAGa0= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v4 v4.5.0 h1:TeJE3I1pIWLBjYhIYCA1+uxrjWEoJXImFBMEBVSm16g= +github.com/dgraph-io/badger/v4 v4.5.0/go.mod h1:ysgYmIeG8dS/E8kwxT7xHyc7MkmwNYLRoYnFbr7387A= +github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMbxscNOAQ= +github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/flink-go/api v0.0.1 h1:YK3TBZLV4TivPhjJ+Z316b2mSg72sPy3IOcm21OyaO4= +github.com/flink-go/api v0.0.1/go.mod h1:nzM6X8dmpVRe3jqfQ4y05JeTIRWZPD4iT6kyEaFBG9A= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/logi-camp/go-flink-client v0.0.1 h1:VZI5ctoJEArwtBprIXZNk+cv5fEQZ4EcuKAbOY8Emvw= +github.com/logi-camp/go-flink-client v0.0.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/logi-camp/go-flink-client v0.0.3 h1:NT9FYJG7jqroq44jfJfRnva3NO/jgpgURcU0mFysK1A= +github.com/logi-camp/go-flink-client v0.0.3/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE= +github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/config/config.type.go b/internal/config/config.type.go new file mode 100644 index 0000000..a3d026f --- /dev/null +++ b/internal/config/config.type.go @@ -0,0 +1,12 @@ +package config + +type JobDef struct { + Key string `yaml:"key"` + Name string `yaml:"name"` + EntryClass string `yaml:"entryClass"` + JarURI string `yaml:"jarURI"` +} + +type Config struct { + Jobs []JobDef `yaml:"jobs"` +} diff --git a/internal/jar/jar.go b/internal/jar/jar.go new file mode 100644 index 0000000..0211e06 --- /dev/null +++ b/internal/jar/jar.go @@ -0,0 +1,70 @@ +package jar + +import ( + "errors" + "io" + "net/http" + "os" + "strings" + + "gitea.com/logicamp/lc" + api "github.com/logi-camp/go-flink-client" + gonanoid "github.com/matoous/go-nanoid/v2" + "go.uber.org/zap" +) + +type JarFile struct { + uri string + filePath string +} + +func NewJarFile(URI string) (*JarFile, error) { + jarFile := &JarFile{ + uri: URI, + } + err := jarFile.Download() + if err != nil { + return nil, err + } + return jarFile, nil +} + +func (JarFile JarFile) Upload(flinkClient *api.Client) (fileName string, err error) { + + resp, err := flinkClient.UploadJar(JarFile.filePath) + if err != nil { + lc.Logger.Error("[main] error uploading jar", zap.Error(err)) + } + filePathParts := strings.Split(resp.FileName, "/") + fileName = filePathParts[len(filePathParts)-1] + if resp.Status != "success" { + err = errors.New("jar upload was not success") + } + return +} + +func (jarFile *JarFile) Download() error { + fileName, _ := gonanoid.New() + jarFile.filePath = fileName + ".jar" + out, err := os.Create(jarFile.filePath) + if err != nil { + return err + } + + defer out.Close() + resp, err := http.Get(jarFile.uri) + if err != nil { + return err + } + + defer resp.Body.Close() + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + return nil +} + +func (jarFile JarFile) Delete() error { + return os.Remove(jarFile.filePath) +} diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go new file mode 100644 index 0000000..513c7d3 --- /dev/null +++ b/internal/managed_job/cycle.go @@ -0,0 +1,71 @@ +package managed_job + +import ( + "errors" + "time" + + "gitea.com/logicamp/lc" + "go.uber.org/zap" +) + +func (job *ManagedJob) startCycle() { + ticker := time.NewTicker(5 * time.Second) + quit := make(chan struct{}) + + // load job state from db + job.loadState() + go func() { + for { + select { + case <-ticker.C: + job.cycle() + case <-quit: + ticker.Stop() + return + } + } + }() +} + +func (job *ManagedJob) cycle() { + lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", job.def.Key)) + + // Init job + if job.state == nil { + err := job.upload() + if err != nil { + job.setError("[upload-error] " + err.Error()) + return + } + err = job.run() + if err != nil { + job.setError("[run-error] " + err.Error()) + return + } + return + } + // Check for set running or error state + if job.state.Status == JobStatusCreating { + err := job.checkStatus() + if errors.Is(err, ErrNoJobId) { + job.state = nil + } + return + } + + if job.state.Status == JobStatusRunning { + err := job.checkStatus() + if errors.Is(err, ErrNoJobId) { + job.state = nil + } + return + } + + //if job.state.LastSavepointDate == nil || time.Now().Add(-time.Minute*3).After(*job.state.LastSavepointDate) { + err := job.createSavepoint() + if errors.Is(err, ErrNoJobId) { + job.state = nil + } + //} + +} diff --git a/internal/managed_job/new.go b/internal/managed_job/new.go new file mode 100644 index 0000000..1a47f65 --- /dev/null +++ b/internal/managed_job/new.go @@ -0,0 +1,26 @@ +package managed_job + +import ( + "flink-kube-operator/internal/config" + + "github.com/dgraph-io/badger/v4" + api "github.com/logi-camp/go-flink-client" +) + +type ManagedJob struct { + def config.JobDef + client *api.Client + jarId string + db *badger.DB + state *jobState +} + +func NewManagedJob(client *api.Client, db *badger.DB, def config.JobDef) *ManagedJob { + job := &ManagedJob{ + def: def, + client: client, + db: db, + } + job.startCycle() + return job +} diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go new file mode 100644 index 0000000..0dbbf3a --- /dev/null +++ b/internal/managed_job/run.go @@ -0,0 +1,46 @@ +package managed_job + +import ( + "flink-kube-operator/internal/jar" + + "gitea.com/logicamp/lc" + api "github.com/logi-camp/go-flink-client" + "go.uber.org/zap" +) + +// upload jar file and set the jarId for later usages +func (job *ManagedJob) upload() error { + jarFile, err := jar.NewJarFile(job.def.JarURI) + if err != nil { + lc.Logger.Debug("[main] error on download jar", zap.Error(err)) + return err + } + fileName, err := jarFile.Upload(job.client) + jarFile.Delete() + if err != nil { + lc.Logger.Debug("[main] error on upload jar", zap.Error(err)) + return err + } + lc.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", fileName)) + + job.jarId = fileName + return nil +} + +// run the job from saved jarId in managedJob +func (job *ManagedJob) run() error { + runJarResp, err := job.client.RunJar(api.RunOpts{ + JarID: job.jarId, + AllowNonRestoredState: true, + EntryClass: job.def.EntryClass, + }) + if err != nil { + lc.Logger.Error("[managed-job] [run]", zap.Error(err)) + return err + } + lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp)) + + job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating}) + + return err +} diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go new file mode 100644 index 0000000..4b74183 --- /dev/null +++ b/internal/managed_job/savepoint.go @@ -0,0 +1,20 @@ +package managed_job + +import ( + "gitea.com/logicamp/lc" + "go.uber.org/zap" +) + +func (job ManagedJob) createSavepoint() error { + if job.state.JobId == nil { + lc.Logger.Debug("[managed-job] [savepoint] no job id") + return ErrNoJobId + } + resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false) + if err != nil { + lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err)) + return err + } + lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp)) + return nil +} diff --git a/internal/managed_job/state.go b/internal/managed_job/state.go new file mode 100644 index 0000000..225c420 --- /dev/null +++ b/internal/managed_job/state.go @@ -0,0 +1,55 @@ +package managed_job + +import ( + "encoding/json" + "errors" + "time" + + "github.com/dgraph-io/badger/v4" +) + +// get state of job from local db +func (job *ManagedJob) loadState() { + err := job.db.View( + func(tx *badger.Txn) error { + if val, err := tx.Get([]byte(job.def.Key)); err != nil { + return err + } else if val != nil { + val.Value(func(val []byte) error { + job.state = &jobState{} + return json.Unmarshal(val, job.state) + }) + } + return nil + }) + if errors.Is(err, badger.ErrKeyNotFound) { + err = nil + } +} + +// save state of job to local db +func (job *ManagedJob) updateState(state jobState) { + job.state = &state + + value, _ := json.Marshal(job.state) + job.db.Update(func(txn *badger.Txn) error { + err := txn.Set([]byte(job.def.Key), value) + if err != nil { + return err + } + return txn.Commit() + }) +} + +func (job *ManagedJob) setError(errMsg string) { + job.state.Error = &errMsg + job.state.Status = JobStatusError + job.updateState(*job.state) +} + +func (job *ManagedJob) setSavepointId(savepointId string) { + job.state.LastSavepointId = &savepointId + n := time.Now() + job.state.LastSavepointDate = &n + job.updateState(*job.state) +} diff --git a/internal/managed_job/status.go b/internal/managed_job/status.go new file mode 100644 index 0000000..5ca6fb1 --- /dev/null +++ b/internal/managed_job/status.go @@ -0,0 +1,32 @@ +package managed_job + +import ( + "strings" + + "gitea.com/logicamp/lc" + "go.uber.org/zap" +) + +func (job *ManagedJob) checkStatus() error { + if job.state.JobId == nil { + lc.Logger.Debug("[managed-job] [status] no job id") + return ErrNoJobId + } + statusResp, err := job.client.Job(*job.state.JobId) + if err != nil { + lc.Logger.Debug("[managed-job] [status] cannot fetch status", zap.Error(err)) + if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 { + job.updateState(jobState{ + JobId: job.state.JobId, + Status: JobStatusNotFound, + }) + } + return err + } + lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp)) + job.updateState(jobState{ + JobId: job.state.JobId, + Status: JobStatus(statusResp.State), + }) + return err +} diff --git a/internal/managed_job/type.go b/internal/managed_job/type.go new file mode 100644 index 0000000..0421b34 --- /dev/null +++ b/internal/managed_job/type.go @@ -0,0 +1,29 @@ +package managed_job + +import ( + "errors" + "time" +) + +type JobStatus string + +var ( + ErrNoJobId = errors.New("[managed-job] no job id") +) + +const ( + JobStatusRunning JobStatus = "RUNNING" + JobStatusCreating JobStatus = "CREATING" + JobStatusNotFound JobStatus = "NotFound" + JobStatusError JobStatus = "ERROR" + JobStatusReconciling JobStatus = "RECONCILING" +) + +type jobState struct { + Status JobStatus `json:"status"` + Error *string `json:"error"` + Info *string `json:"info"` + JobId *string `json:"job_id"` + LastSavepointId *string `json:"last_savepoint_id"` + LastSavepointDate *time.Time `json:"last_savepoint_time"` +}