58 lines
1.2 KiB
Go
58 lines
1.2 KiB
Go
package crd
|
|
|
|
import (
|
|
"flink-kube-operator/internal/crd/v1alpha1"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/client/config"
|
|
)
|
|
|
|
type Crd struct {
|
|
client dynamic.NamespaceableResourceInterface
|
|
runtimeClient client.Client
|
|
}
|
|
|
|
func New() *Crd {
|
|
// Get Kubernetes config_
|
|
config_, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
|
|
if err != nil {
|
|
config_, err = rest.InClusterConfig()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// Create dynamic client
|
|
dynamicClient, err := dynamic.NewForConfig(config_)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
shema := runtime.NewScheme()
|
|
v1alpha1.AddKnownTypes(shema)
|
|
// Get FlinkJob resource interface
|
|
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR)
|
|
runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{
|
|
Scheme: shema,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
crd := Crd{
|
|
client: flinkJobClient,
|
|
runtimeClient: runtimeClient,
|
|
}
|
|
|
|
// Watch for FlinkJob creation
|
|
jobEventObservable := crd.watchFlinkJobs()
|
|
|
|
// add finalizer to new resources
|
|
go crd.manageFinalizer(jobEventObservable)
|
|
|
|
return &crd
|
|
}
|