diff --git a/internal/crd/patch.go b/internal/crd/patch.go index 434121a..afda2e4 100644 --- a/internal/crd/patch.go +++ b/internal/crd/patch.go @@ -42,7 +42,7 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error if err != nil { pkg.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err)) } - jobs[jobUid] = newJob + jobs.Store(jobUid, newJob) if err != nil { pkg.Logger.Error("[crd] [status] ", zap.Error(err)) return err diff --git a/internal/crd/repo.go b/internal/crd/repo.go index 55045b4..d96c94f 100644 --- a/internal/crd/repo.go +++ b/internal/crd/repo.go @@ -2,25 +2,28 @@ package crd import ( "flink-kube-operator/internal/crd/v1alpha1" - "maps" + "flink-kube-operator/pkg" "k8s.io/apimachinery/pkg/types" ) -var jobs = map[types.UID]*v1alpha1.FlinkJob{} +// var jobs = map[types.UID]*v1alpha1.FlinkJob{} +var jobs = pkg.SafeMap[types.UID, *v1alpha1.FlinkJob]{} func (crd *Crd) repsert(job *v1alpha1.FlinkJob) { - jobs[job.GetUID()] = job + jobs.Store(job.GetUID(), job) } func GetJob(uid types.UID) v1alpha1.FlinkJob { - return *jobs[uid].DeepCopy() + job, _ := jobs.Load(uid) + return *job.DeepCopy() } func GetAllJobKeys() []types.UID { keys := []types.UID{} - for k := range maps.Keys(jobs) { + jobs.Range(func(k types.UID, value *v1alpha1.FlinkJob) bool { keys = append(keys, k) - } + return true + }) return keys } diff --git a/pkg/safemap.go b/pkg/safemap.go new file mode 100644 index 0000000..7c616f5 --- /dev/null +++ b/pkg/safemap.go @@ -0,0 +1,42 @@ +package pkg + +import "sync" + +// SafeMap is a type-safe wrapper around sync.Map +type SafeMap[K comparable, V any] struct { + m sync.Map +} + +// Store sets the value for a key. +func (t *SafeMap[K, V]) Store(key K, value V) { + t.m.Store(key, value) +} + +// Load returns the value for a key, and whether the key was found. +func (t *SafeMap[K, V]) Load(key K) (V, bool) { + value, ok := t.m.Load(key) + if !ok { + var zero V + return zero, false + } + return value.(V), true +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +func (t *SafeMap[K, V]) LoadOrStore(key K, value V) (V, bool) { + actual, loaded := t.m.LoadOrStore(key, value) + return actual.(V), loaded +} + +// Delete removes the key and its value from the map. +func (t *SafeMap[K, V]) Delete(key K) { + t.m.Delete(key) +} + +// Range iterates over all key-value pairs in the map. +func (t *SafeMap[K, V]) Range(f func(key K, value V) bool) { + t.m.Range(func(key, value interface{}) bool { + return f(key.(K), value.(V)) + }) +}