fix: prevent concurrent jobs map writes by adding synchronization
This fix prevents undefined behavior and runtime crashes in multithreaded use cases.
This commit is contained in:
parent
02d8f0b02e
commit
9f36dca7c9
@ -42,7 +42,7 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
pkg.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err))
|
pkg.Logger.Error("[crd] [status] error in structure unstructured patched", zap.Error(err))
|
||||||
}
|
}
|
||||||
jobs[jobUid] = newJob
|
jobs.Store(jobUid, newJob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pkg.Logger.Error("[crd] [status] ", zap.Error(err))
|
pkg.Logger.Error("[crd] [status] ", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -2,25 +2,28 @@ package crd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flink-kube-operator/internal/crd/v1alpha1"
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"maps"
|
"flink-kube-operator/pkg"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var jobs = map[types.UID]*v1alpha1.FlinkJob{}
|
// var jobs = map[types.UID]*v1alpha1.FlinkJob{}
|
||||||
|
var jobs = pkg.SafeMap[types.UID, *v1alpha1.FlinkJob]{}
|
||||||
|
|
||||||
func (crd *Crd) repsert(job *v1alpha1.FlinkJob) {
|
func (crd *Crd) repsert(job *v1alpha1.FlinkJob) {
|
||||||
jobs[job.GetUID()] = job
|
jobs.Store(job.GetUID(), job)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetJob(uid types.UID) v1alpha1.FlinkJob {
|
func GetJob(uid types.UID) v1alpha1.FlinkJob {
|
||||||
return *jobs[uid].DeepCopy()
|
job, _ := jobs.Load(uid)
|
||||||
|
return *job.DeepCopy()
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAllJobKeys() []types.UID {
|
func GetAllJobKeys() []types.UID {
|
||||||
keys := []types.UID{}
|
keys := []types.UID{}
|
||||||
for k := range maps.Keys(jobs) {
|
jobs.Range(func(k types.UID, value *v1alpha1.FlinkJob) bool {
|
||||||
keys = append(keys, k)
|
keys = append(keys, k)
|
||||||
}
|
return true
|
||||||
|
})
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|||||||
42
pkg/safemap.go
Normal file
42
pkg/safemap.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package pkg
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// SafeMap is a type-safe wrapper around sync.Map
|
||||||
|
type SafeMap[K comparable, V any] struct {
|
||||||
|
m sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store sets the value for a key.
|
||||||
|
func (t *SafeMap[K, V]) Store(key K, value V) {
|
||||||
|
t.m.Store(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load returns the value for a key, and whether the key was found.
|
||||||
|
func (t *SafeMap[K, V]) Load(key K) (V, bool) {
|
||||||
|
value, ok := t.m.Load(key)
|
||||||
|
if !ok {
|
||||||
|
var zero V
|
||||||
|
return zero, false
|
||||||
|
}
|
||||||
|
return value.(V), true
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadOrStore returns the existing value for the key if present.
|
||||||
|
// Otherwise, it stores and returns the given value.
|
||||||
|
func (t *SafeMap[K, V]) LoadOrStore(key K, value V) (V, bool) {
|
||||||
|
actual, loaded := t.m.LoadOrStore(key, value)
|
||||||
|
return actual.(V), loaded
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes the key and its value from the map.
|
||||||
|
func (t *SafeMap[K, V]) Delete(key K) {
|
||||||
|
t.m.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Range iterates over all key-value pairs in the map.
|
||||||
|
func (t *SafeMap[K, V]) Range(f func(key K, value V) bool) {
|
||||||
|
t.m.Range(func(key, value interface{}) bool {
|
||||||
|
return f(key.(K), value.(V))
|
||||||
|
})
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user