61 lines
1.3 KiB
Go

package crd
import (
"flink-kube-operator/internal/crd/v1alpha1"
"os"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
type Crd struct {
client dynamic.ResourceInterface
runtimeClient client.Client
}
func New() *Crd {
// Get Kubernetes config_
config_, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
config_, err = rest.InClusterConfig()
if err != nil {
panic(err)
}
}
// Create dynamic client
dynamicClient, err := dynamic.NewForConfig(config_)
if err != nil {
panic(err)
}
scheme := runtime.NewScheme()
v1alpha1.AddKnownTypes(scheme)
// Get FlinkJob resource interface
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR).Namespace(os.Getenv("NAMESPACE"))
runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{
Scheme: scheme,
})
if err != nil {
panic(err)
}
crd := Crd{
client: flinkJobClient,
runtimeClient: runtimeClient,
}
jobEventCh := make(chan FlinkJobCrdEvent)
// add finalizer to new resources
go crd.manageFinalizer(jobEventCh)
// Watch for FlinkJob creation
crd.watchFlinkJobs(jobEventCh)
return &crd
}