From 6bd197b8127117ca6b097579ac1ba05c8103b2bd Mon Sep 17 00:00:00 2001 From: Mohammadreza Khani Date: Sat, 30 Nov 2024 23:36:44 +0330 Subject: [PATCH] feat: add manager --- .vscode/settings.json | 4 +++- cmd/operator/main.go | 35 ++++++++++++++-------------- config.yaml | 5 ---- go.mod | 2 +- go.sum | 2 ++ internal/config/config.type.go | 14 ------------ internal/crd/repo.go | 26 +++++++++++++++++++++ internal/crd/watch.go | 16 ++----------- internal/jar/jar.go | 4 ++-- internal/managed_job/cycle.go | 4 ++-- internal/managed_job/new.go | 6 ++--- internal/managed_job/run.go | 4 ++-- internal/managed_job/state.go | 12 +++++----- internal/manager/manager.go | 42 ++++++++++++++++++++++++++++++++++ 14 files changed, 109 insertions(+), 67 deletions(-) create mode 100644 internal/crd/repo.go create mode 100644 internal/manager/manager.go diff --git a/.vscode/settings.json b/.vscode/settings.json index 56c0029..d803532 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,9 @@ { "cSpell.words": [ "apiextensions", + "buntdb", "deepcopy", - "flink" + "flink", + "repsert" ] } \ No newline at end of file diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 6dc1aba..28054f2 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -1,9 +1,8 @@ package main import ( - "flink-kube-operator/internal/config" "flink-kube-operator/internal/crd" - "flink-kube-operator/internal/managed_job" + "flink-kube-operator/internal/manager" "fmt" "log" "os" @@ -51,22 +50,24 @@ func main() { } lc.Logger.Debug("[main] jobs", zap.Any("jobs", jobs)) - config := lc.LoadYamlConfig[config.Config]("./config.yaml") - for _, jobDef := range config.Jobs { - managed_job.NewManagedJob(c, db, jobDef) - } + manager.Setup(c, db) - for _, job := range jobs.Jobs { - job, err := c.Job(job.ID) - if err != nil { - lc.Logger.Error("error getting job info", zap.Error(err)) - continue - } - if job.State == "RUNNING" { - lc.Logger.Debug("[main] running job", zap.String("jobId", job.ID)) - } - // lc.Logger.Debug("[main]", zap.Any("job", job)) - } + //config := lc.LoadYamlConfig[config.Config]("./config.yaml") + // for _, jobDef := range config.Jobs { + // managed_job.NewManagedJob(c, db, jobDef) + // } + + // for _, job := range jobs.Jobs { + // job, err := c.Job(job.ID) + // if err != nil { + // lc.Logger.Error("error getting job info", zap.Error(err)) + // continue + // } + // if job.State == "RUNNING" { + // lc.Logger.Debug("[main] running job", zap.String("jobId", job.ID)) + // } + // // lc.Logger.Debug("[main]", zap.Any("job", job)) + // } cancelChan := make(chan os.Signal, 1) sig := <-cancelChan diff --git a/config.yaml b/config.yaml index 03a66c4..6cd2f8b 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1 @@ jobs: - - key: price-processor - name: Price Processor - entryClass: tech.logicamp.Main - jarURI: http://price-processor.bz2/price-processor-v0.0.1.jar - savepointInterval: 3m \ No newline at end of file diff --git a/go.mod b/go.mod index a7ac3c7..c9d5abd 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.2 require ( gitea.com/logicamp/lc v1.14.6 github.com/dgraph-io/badger/v4 v4.5.0 - github.com/logi-camp/go-flink-client v0.1.0 + github.com/logi-camp/go-flink-client v0.1.1 github.com/matoous/go-nanoid/v2 v2.1.0 github.com/tidwall/buntdb v1.3.2 go.uber.org/zap v1.27.0 diff --git a/go.sum b/go.sum index 9063724..a4a1cae 100644 --- a/go.sum +++ b/go.sum @@ -262,6 +262,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/logi-camp/go-flink-client v0.1.0 h1:uzBV6RGkyzZVdSQ8zAlUGbJ5hdXRspaPjzzYJUQ2aJU= github.com/logi-camp/go-flink-client v0.1.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= +github.com/logi-camp/go-flink-client v0.1.1 h1:N/XLXE/WZGPKgJo8pN+Q3JrHBOoZHwUoditd399coRo= +github.com/logi-camp/go-flink-client v0.1.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= diff --git a/internal/config/config.type.go b/internal/config/config.type.go index a940044..d912156 100644 --- a/internal/config/config.type.go +++ b/internal/config/config.type.go @@ -1,15 +1 @@ package config - -import "time" - -type JobDef struct { - Key string `yaml:"key"` - Name string `yaml:"name"` - EntryClass string `yaml:"entryClass"` - JarURI string `yaml:"jarURI"` - SavepointInterval time.Duration `yaml:"savepointInterval"` -} - -type Config struct { - Jobs []JobDef `yaml:"jobs"` -} diff --git a/internal/crd/repo.go b/internal/crd/repo.go new file mode 100644 index 0000000..c42ed3b --- /dev/null +++ b/internal/crd/repo.go @@ -0,0 +1,26 @@ +package crd + +import ( + "flink-kube-operator/internal/crd/v1alpha1" + "maps" + + "k8s.io/apimachinery/pkg/types" +) + +var jobs = map[types.UID]*v1alpha1.FlinkJob{} + +func (crd Crd) repsert(job *v1alpha1.FlinkJob) { + jobs[job.GetUID()] = job +} + +func GetJob(uid types.UID) v1alpha1.FlinkJob { + return *jobs[uid].DeepCopy() +} + +func GetAllJobKeys() []types.UID { + keys := []types.UID{} + for k := range maps.Keys(jobs) { + keys = append(keys, k) + } + return keys +} diff --git a/internal/crd/watch.go b/internal/crd/watch.go index 5a19b4f..cac29e2 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -40,25 +40,13 @@ func (crd Crd) watchFlinkJobs() { case watch.Modified: fmt.Printf("FlinkJob updated: %s\n", job.GetName()) // Handle the new FlinkJob - handleNewFlinkJob(job) + crd.repsert(job) case watch.Added: fmt.Printf("New FlinkJob created: %s\n", job.GetName()) // Handle the new FlinkJob - handleNewFlinkJob(job) + crd.repsert(job) case watch.Deleted: } } } - -func handleNewFlinkJob(job *v1alpha1.FlinkJob) { - // Extract job details - name := job.GetName() - namespace := job.GetNamespace() - - // Process job specification - fmt.Printf("Processing FlinkJob %s in namespace %s kind: %s \n", name, namespace, job.Kind) - - lc.Logger.Debug("[crd] [watch]", zap.Any("spec", job), zap.Any("name", job.Spec.Name)) - // Add your custom logic here -} diff --git a/internal/jar/jar.go b/internal/jar/jar.go index 0211e06..54fad6c 100644 --- a/internal/jar/jar.go +++ b/internal/jar/jar.go @@ -29,7 +29,7 @@ func NewJarFile(URI string) (*JarFile, error) { return jarFile, nil } -func (JarFile JarFile) Upload(flinkClient *api.Client) (fileName string, err error) { +func (JarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err error) { resp, err := flinkClient.UploadJar(JarFile.filePath) if err != nil { @@ -65,6 +65,6 @@ func (jarFile *JarFile) Download() error { return nil } -func (jarFile JarFile) Delete() error { +func (jarFile *JarFile) Delete() error { return os.Remove(jarFile.filePath) } diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 765adb1..c7b7312 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -28,7 +28,7 @@ func (job *ManagedJob) startCycle() { } func (job *ManagedJob) cycle() { - lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", job.def.Key)) + lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID))) // Init job if job.state == nil { @@ -58,7 +58,7 @@ func (job *ManagedJob) cycle() { if errors.Is(err, ErrNoJobId) { job.state = nil } - if job.state.LastSavepointDate == nil || time.Now().Add(-job.def.SavepointInterval).After(*job.state.LastSavepointDate) { + if job.state.LastSavepointDate == nil || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate) { if job.state.SavepointTriggerId == nil { job.createSavepoint() } else { diff --git a/internal/managed_job/new.go b/internal/managed_job/new.go index 2759e5d..1a29bb0 100644 --- a/internal/managed_job/new.go +++ b/internal/managed_job/new.go @@ -1,21 +1,21 @@ package managed_job import ( - "flink-kube-operator/internal/config" + "flink-kube-operator/internal/crd/v1alpha1" api "github.com/logi-camp/go-flink-client" "github.com/tidwall/buntdb" ) type ManagedJob struct { - def config.JobDef + def v1alpha1.FlinkJob client *api.Client jarId string db *buntdb.DB state *jobState } -func NewManagedJob(client *api.Client, db *buntdb.DB, def config.JobDef) *ManagedJob { +func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob) *ManagedJob { job := &ManagedJob{ def: def, client: client, diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index 0dbbf3a..af220e4 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -10,7 +10,7 @@ import ( // upload jar file and set the jarId for later usages func (job *ManagedJob) upload() error { - jarFile, err := jar.NewJarFile(job.def.JarURI) + jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) if err != nil { lc.Logger.Debug("[main] error on download jar", zap.Error(err)) return err @@ -32,7 +32,7 @@ func (job *ManagedJob) run() error { runJarResp, err := job.client.RunJar(api.RunOpts{ JarID: job.jarId, AllowNonRestoredState: true, - EntryClass: job.def.EntryClass, + EntryClass: job.def.Spec.EntryClass, }) if err != nil { lc.Logger.Error("[managed-job] [run]", zap.Error(err)) diff --git a/internal/managed_job/state.go b/internal/managed_job/state.go index f2ad920..0334387 100644 --- a/internal/managed_job/state.go +++ b/internal/managed_job/state.go @@ -13,7 +13,7 @@ import ( func (job *ManagedJob) loadState() { err := job.db.View( func(tx *buntdb.Tx) error { - if val, err := tx.Get(job.def.Key); err != nil { + if val, err := tx.Get(string(job.def.GetUID())); err != nil { return err } else { return json.Unmarshal([]byte(val), job.state) @@ -30,15 +30,15 @@ func (job *ManagedJob) updateState(state jobState) { value, _ := json.Marshal(job.state) job.db.Update(func(tx *buntdb.Tx) error { - _, _, err := tx.Set(job.def.Key, string(value), nil) - if err != nil { - return err - } - return tx.Commit() + _, _, err := tx.Set(string(job.def.GetUID()), string(value), nil) + return err }) } func (job *ManagedJob) setError(errMsg string) { + if job.state == nil { + job.state = &jobState{} + } job.state.Error = &errMsg job.state.Status = JobStatusError job.updateState(*job.state) diff --git a/internal/manager/manager.go b/internal/manager/manager.go new file mode 100644 index 0000000..5a8fbcf --- /dev/null +++ b/internal/manager/manager.go @@ -0,0 +1,42 @@ +package manager + +import ( + "flink-kube-operator/internal/crd" + "flink-kube-operator/internal/managed_job" + "time" + + api "github.com/logi-camp/go-flink-client" + "github.com/tidwall/buntdb" + "k8s.io/apimachinery/pkg/types" +) + +var managedJobs = map[types.UID]managed_job.ManagedJob{} + +func Setup(client *api.Client, db *buntdb.DB) { + ticker := time.NewTicker(5 * time.Second) + quit := make(chan struct{}) + + go func() { + for { + select { + case <-ticker.C: + cycle(client, db) + case <-quit: + ticker.Stop() + return + } + } + }() +} + +func cycle(client *api.Client, db *buntdb.DB) { + for _, uid := range crd.GetAllJobKeys() { + job := crd.GetJob(uid) + _, ok := managedJobs[uid] + if ok { + + } else { + managed_job.NewManagedJob(client, db, job) + } + } +}