From ac8453bb57bf80cb09ee2881beb5d1569a29d215 Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sun, 1 Dec 2024 02:05:21 +0330 Subject: [PATCH] feat: add flinkApiUrl and database path configs --- cmd/operator/main.go | 25 +++++++++---------------- config.yaml | 3 ++- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 28054f2..43bb00d 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -1,6 +1,7 @@ package main import ( + "flink-kube-operator/internal/config" "flink-kube-operator/internal/crd" "flink-kube-operator/internal/manager" "fmt" @@ -16,17 +17,21 @@ import ( func main() { lc.Logger.Debug("start") + // load yaml config file + config := lc.LoadYamlConfig[config.Config]("./config.yaml") + + // init kubernetes flink job crd watch crd.New() // create database instance - db, err := buntdb.Open("./bunt.db") - + db, err := buntdb.Open(config.DatabasePath) if err != nil { lc.Logger.Fatal("[main] error on open db", zap.Error(err)) } defer db.Close() - c, err := api.New("127.0.0.1:8981") + // create flink api instance + c, err := api.New(config.FlinkApiUrl) if err != nil { panic(err) } @@ -38,21 +43,9 @@ func main() { } fmt.Println(clusterConfig) - jars, err := c.Jars() - if err != nil { - lc.Logger.Error("error on getting jars", zap.Error(err)) - } - lc.Logger.Debug("[main] jars", zap.Any("jars", jars)) - - jobs, err := c.Jobs() - if err != nil { - lc.Logger.Error("error on getting jars", zap.Error(err)) - } - lc.Logger.Debug("[main] jobs", zap.Any("jobs", jobs)) - + // init flink job manager manager.Setup(c, db) - //config := lc.LoadYamlConfig[config.Config]("./config.yaml") // for _, jobDef := range config.Jobs { // managed_job.NewManagedJob(c, db, jobDef) // } diff --git a/config.yaml b/config.yaml index 6cd2f8b..27c6045 100644 --- a/config.yaml +++ b/config.yaml @@ -1 +1,2 @@ -jobs: +flinkApiUrl: 127.0.0.1:8981 +databasePath: ./bunt.db \ No newline at end of file