Compare commits

..

No commits in common. "main" and "feature/kube-crd-control" have entirely different histories.

81 changed files with 387 additions and 1461 deletions

2
.vscode/launch.json vendored
View File

@ -10,7 +10,7 @@
"request": "launch",
"mode": "auto",
"env": {
"FLINK_API_URL": "flink.bz2:8081",
"FLINK_API_URL": "127.0.0.1:8081",
"SAVEPOINT_PATH": "/opt/flink/savepoints"
},
"cwd": "${workspaceFolder}",

View File

@ -8,13 +8,12 @@
"flink",
"gitea",
"gonanoid",
"huma",
"logicamp",
"Namespaceable",
"nindent",
"reactivex",
"repsert",
"taskmanager",
"rxgo",
"tolerations"
]
}

View File

@ -1,8 +1,8 @@
FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build
ARG upx_version=4.2.4
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \
curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \
cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \
chmod +x /usr/local/bin/upx && \
@ -20,14 +20,13 @@ COPY . .
# Build
ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator \
&& upx -q -9 /flink-kube-operator
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator
RUN upx -q -5 /flink-kube-operator
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
COPY --from=build /flink-kube-operator /flink-kube-operator
COPY --from=build /etc/ssl/certs /etc/ssl/certs
EXPOSE 8083

View File

@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17
FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17
# Set working directory
WORKDIR /opt/flink
@ -15,19 +15,8 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"]

View File

@ -1,6 +0,0 @@
Installation:
```bash
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
```

View File

@ -2,8 +2,7 @@ package main
import (
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/managed_job"
"flink-kube-operator/internal/rest"
"flink-kube-operator/internal/manager"
"flink-kube-operator/pkg"
"log"
"os"
@ -31,7 +30,7 @@ func main() {
pkg.Logger.Info("[main]", zap.Any("cluster-config", clusterConfig))
// init flink job manager
managed_job.NewManager(c, crdInstance)
manager.NewManager(c, crdInstance)
// for _, jobDef := range config.Jobs {
// managed_job.NewManagedJob(c, db, jobDef)
@ -49,8 +48,6 @@ func main() {
// // pkg.Logger.Debug("[main]", zap.Any("job", job))
// }
go rest.Init()
cancelChan := make(chan os.Signal, 1)
sig := <-cancelChan
log.Printf("Caught signal %v", sig)

View File

@ -36,14 +36,6 @@ spec:
type: integer
jarUri:
type: string
jarURIBasicAuthUsername:
type: string
jarURIBasicAuthPassword:
type: string
args:
type: array
items:
type: string
savepointInterval:
type: string
format: duration

View File

@ -3,14 +3,13 @@ apiVersion: flink.logicamp.tech/v1alpha1
kind: FlinkJob
metadata:
name: my-flink-job
namespace: default
spec:
key: word-count
name: "Word Count Example"
entryClass: "tech.logicamp.logiline.FacilityEnrichment"
parallelism: 1
jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar"
jarURIBasicAuthUsername: logiline-actrunner
jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong
entryClass: "org.apache.flink.examples.java.wordcount.WordCount"
parallelism: 2
jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
parallelism.default: "1"
taskmanager.numberOfTaskSlots: "2"
parallelism.default: "2"

19
go.mod
View File

@ -3,9 +3,8 @@ module flink-kube-operator
go 1.23.2
require (
github.com/danielgtaylor/huma/v2 v2.27.0
github.com/gofiber/fiber/v2 v2.52.6
github.com/logi-camp/go-flink-client v0.2.0
github.com/matoous/go-nanoid/v2 v2.1.0
github.com/samber/lo v1.47.0
go.uber.org/zap v1.27.0
k8s.io/apimachinery v0.31.3
@ -13,7 +12,6 @@ require (
)
require (
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
@ -23,20 +21,16 @@ require (
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.58.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)
require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
@ -49,12 +43,17 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/reactivex/rxgo/v2 v2.5.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.26.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.6.0 // indirect

51
go.sum
View File

@ -1,14 +1,15 @@
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/danielgtaylor/huma/v2 v2.27.0 h1:yxgJ8GqYqKeXw/EnQ4ZNc2NBpmn49AlhxL2+ksSXjUI=
github.com/danielgtaylor/huma/v2 v2.27.0/go.mod h1:NbSFXRoOMh3BVmiLJQ9EbUpnPas7D9BeOxF/pZBAGa0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
@ -27,8 +28,6 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI=
github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
@ -53,8 +52,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
@ -64,17 +62,15 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE=
github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -91,9 +87,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/reactivex/rxgo/v2 v2.5.0 h1:FhPgHwX9vKdNQB2gq9EPt+EKk9QrrzoeztGbEEnZam4=
github.com/reactivex/rxgo/v2 v2.5.0/go.mod h1:bs4fVZxcb5ZckLIOeIeVH942yunJLWDABWGbrHAW+qU=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
@ -103,24 +98,22 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.58.0 h1:GGB2dWxSbEprU9j0iMJHgdKYJVDyjrOwF9RE59PbRuE=
github.com/valyala/fasthttp v1.58.0/go.mod h1:SYXvHHaFp7QZHGKSHmoMipInhrI5StHrhDTYVEjK/Kw=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 h1:BLNsFR8l/hj/oGjnJXkd4Vi3s4kQD3/3x8HSAE4bzN0=
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775/go.mod h1:XUZ4x3oGhWfiOnUvTslnKKs39AWUct3g3yJvXTQSJOQ=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@ -132,8 +125,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o=
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -145,13 +140,15 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU=
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -161,6 +158,8 @@ golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@ -173,6 +172,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
@ -181,6 +181,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

24
helm/Chart.yaml Normal file
View File

@ -0,0 +1,24 @@
apiVersion: v2
name: flink-kube-operator
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.16.0"

View File

@ -1,6 +0,0 @@
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3
generated: "2025-04-04T14:42:09.771390014+03:30"

View File

@ -1,6 +0,0 @@
apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 1.2.3
appVersion: "0.1.1"

View File

@ -1,12 +0,0 @@
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}
{{- end }}

View File

@ -1,43 +0,0 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
state.savepoints.dir: file:///opt/flink/savepoints/
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
state.savepoints.dir: s3://flink/savepoints/
{{- end }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- end }}
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }}

View File

@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-ha-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.ha.size }} # Use size defined in values.yaml

View File

@ -1,102 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"]
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
{{- end }}
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoints
{{- end }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -1,28 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- name: flink-web-ui
port: 8081
targetPort: 8081
- name: rpc
port: 6123
targetPort: 6123
- name: blob
port: 6124
targetPort: 6124
- name: query
port: 6125
targetPort: 6125
- name: operator
port: 3000
targetPort: 3000
selector:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@ -1,12 +0,0 @@
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-savepoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.savepoint.size }}
{{- end }}

View File

@ -1,80 +0,0 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-task-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: taskmanager
spec:
serviceName: {{ .Release.Name }}-flink-task-manager
replicas: {{ .Values.flink.taskManager.replicas }}
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
containers:
- name: task-manager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
env:
{{- include "flink.env" . | nindent 8 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
{{- end }}
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoints
{{- end }}
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
volumes:
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage
spec:
accessModes: [ ReadWriteOnce ]
resources:
requests:
storage: {{ .Values.flink.taskManager.storage.rocksDb.size }}

View File

@ -1,17 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-flink-operator
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
protocol: TCP
name: http
selector:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator

View File

@ -1,70 +0,0 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-operator
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceName: {{ .Release.Name }}-flink-operator
replicas: 1
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: wait-for-jobmanager
image: curlimages/curl:8.5.0 # Lightweight curl image
command:
- sh
- -c
- |
echo "Waiting for Flink JobManager to be ready..."
until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do
echo "JobManager not ready yet - retrying in 5s..."
sleep 5
done
echo "JobManager is ready!"
containers:
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: {{ .Release.Name }}-flink-job-manager:8081
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
{{- if eq .Values.flink.state.savepoint.storageType "s3" }}
- name: SAVEPOINT_PATH
value: s3://flink/savepoints/
- name: S3_ENDPOINT
value: "http://{{ .Release.Name }}-minio:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-user
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-password
{{- else }}
- name: SAVEPOINT_PATH
value: /opt/flink/savepoints/
{{- end }}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,240 +0,0 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-07-18T18:09:46.27166563+03:30"
description: Helm chart for flink kube operator
digest: 597f2c07884bb5411dcc6e1a9cdf7672977858efe30273a46fb6525eb6013091
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.3.tgz
version: 1.2.3
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:34:55.317942453+03:30"
description: Helm chart for flink kube operator
digest: 422a34dc173ebe29adccd46d7ef94505cc022ff20ccbfb85ac3e6e201cba476c
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.2.tgz
version: 1.2.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:01:29.891695937+03:30"
description: Helm chart for flink kube operator
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.1.tgz
version: 1.2.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T12:47:25.848097207+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.0.tgz
version: 1.2.0
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.0.tgz
version: 1.0.0
- apiVersion: v2
appVersion: 0.1.0
created: "2025-04-04T13:50:27.971040367+03:30"
description: Helm chart for flink kube operator
digest: 00acef7878bcf372d036fabaac400913097d678087a756102b54a28428197bdf
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.14.tgz
version: 0.1.14
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T23:13:19.698003661+03:30"
description: Helm chart for flink kube operator
digest: d104b9242362415a7b920e4e2af975730e208ff73db17b8d2afd11ea8b78b4a2
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.13.tgz
version: 0.1.13
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T23:13:19.697555829+03:30"
description: Helm chart for flink kube operator
digest: f58802990389ecde00a49a442f6e83a007e281e972d07f2979657d2763fe94ba
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.12.tgz
version: 0.1.12
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.491747333+03:30"
description: Helm chart for flink kube operator
digest: 0daa98c63b443018c2072a2d7448c972faff2274fb04433c613532b408cd3ab1
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.11.tgz
version: 0.1.11
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.490697387+03:30"
description: Helm chart for flink kube operator
digest: e091256eeb8640b61443cbe4781426ef493737ab0ac1145e568426bb2c1ed3ba
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.10.tgz
version: 0.1.10
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.495842696+03:30"
description: Helm chart for flink kube operator
digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.9.tgz
version: 0.1.9
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.495392608+03:30"
description: Helm chart for flink kube operator
digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.8.tgz
version: 0.1.8
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.494948853+03:30"
description: Helm chart for flink kube operator
digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.7.tgz
version: 0.1.7
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.49450822+03:30"
description: Helm chart for flink kube operator
digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.6.tgz
version: 0.1.6
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.494040193+03:30"
description: Helm chart for flink kube operator
digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.5.tgz
version: 0.1.5
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.493584927+03:30"
description: Helm chart for flink kube operator
digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.4.tgz
version: 0.1.4
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.493138547+03:30"
description: Helm chart for flink kube operator
digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.3.tgz
version: 0.1.3
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.492696005+03:30"
description: Helm chart for flink kube operator
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.2.tgz
version: 0.1.2
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.490170385+03:30"
description: Helm chart for flink kube operator
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.1.tgz
version: 0.1.1
- apiVersion: v2
appVersion: 0.1.0
created: "2025-03-04T18:04:35.489734651+03:30"
description: Helm chart for flink kube operator
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2025-07-18T18:09:46.244672127+03:30"

View File

@ -17,6 +17,6 @@
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8081 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}

View File

@ -0,0 +1,71 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
app.kubernetes.io/managed-by: Helm
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
app.kubernetes.io/instance: {{ .Release.Name }}
template:
metadata:
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
containers:
- name: flink
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
ports:
- containerPort: 8081 # JobManager Web UI port
- containerPort: 6121 # TaskManager communication port
- containerPort: 6122 # TaskManager communication port
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost" # JobManager and TaskManager in the same container
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: localhost
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
state.savepoints.dir: {{ .Values.flink.state.savepoints.dir }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
web.upload.dir: /opt/flink/data/web-upload
state.checkpoints.dir: file:///tmp/flink-checkpoints
high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/ha
kubernetes.cluster-id: cluster-one
kubernetes.namespace: {{ .Release.Namespace }}
volumeMounts:
- name: flink-data
mountPath: /opt/flink/data
subPath: data
- name: flink-data
mountPath: /opt/flink/web-upload
subPath: web-upload
- name: flink-savepoints
mountPath: /opt/flink/savepoints
- name: flink-savepoints
mountPath: /opt/flink/ha
subPath: ha
volumes:
- name: flink-data
emptyDir: {} # Temporary storage for internal data
- name: flink-savepoints
persistentVolumeClaim:
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.persistence.size }} # Use size defined in values.yaml

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: flink # Adding the flink prefix to the service name
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- port: 8081
targetPort: 8081
selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
app.kubernetes.io/instance: {{ .Release.Name }}
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@ -0,0 +1,61 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "flink-kube-operator.fullname" . }}
labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "flink-kube-operator.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "flink-kube-operator.labels" . | nindent 8 }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: {{ .Values.config.flinkApiUrl }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -2,6 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ include "flink-kube-operator.serviceAccountName" . }}
namespace: {{ .Release.Namespace }} # Namespace where the role is created
labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }}
rules:

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "flink-kube-operator.fullname" . }}
labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
protocol: TCP
name: http
selector:
{{- include "flink-kube-operator.selectorLabels" . | nindent 4 }}

View File

@ -38,7 +38,8 @@ podAnnotations: {}
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {}
podSecurityContext: {} # fsGroup: 2000
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
@ -53,7 +54,7 @@ service:
# This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
type: ClusterIP
# This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports
port: 3000
port: 80
# This block is for setting up the ingress for more information can be found here: https://kubernetes.io/docs/concepts/services-networking/ingress/
ingress:
@ -63,10 +64,10 @@ ingress:
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
@ -105,6 +106,7 @@ autoscaling:
config:
flinkApiUrl: flink:8081
nodeSelector: {}
tolerations: []
@ -115,44 +117,22 @@ affinity: {}
flink:
image:
repository: lcr.logicamp.tech/library/flink
tag: 1.20.1-scala_2.12-java17-minicluster
tag: 1.20.0-scala_2.12-java17-minicluster
parallelism:
default: 1 # Default parallelism for Flink jobs
default: 1 # Default parallelism for Flink jobs
state:
backend: rocksdb # Use RocksDB for state backend
incremental: true
ha:
dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
savepoint:
storageType: s3
size: 8Gi
jobManager:
processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
backend: rocksdb # Use RocksDB for state backend
savepoints:
dir: "file:///opt/flink/savepoints" # Directory to store savepoints
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
taskManager:
numberOfTaskSlots: 12 # Number of task slots for task manager
processMemory: 4096m # Size of task manager process memory
replicas: 1 # Number of task manager replicas
storage:
rocksDb:
size: 4Gi
resources:
limits:
cpu: 3
memory: 4Gi
requests:
cpu: 1
memory: 2Gi
numberOfTaskSlots: 100 # Number of task slots for TaskManager
persistence:
enabled: true
size: 10Gi # PVC size for savepoints storage
accessModes:
- ReadWriteOnce

View File

@ -1,61 +1,14 @@
package crd
import (
"context"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/pkg"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"github.com/reactivex/rxgo/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
var FinalizerChannel chan (types.UID) = make(chan (types.UID))
func (crd Crd) manageFinalizer(jobEventChannel chan FlinkJobCrdEvent) {
finalizerName := "flink-operator.logicamp.tech/finalizer"
for jobEvent := range jobEventChannel {
pkg.Logger.Debug("[crd] [manage-finalizer] main loop", zap.String("name", jobEvent.Job.Name))
go func() {
if jobEvent.Job.GetDeletionTimestamp() != nil {
// Resource is being deleted
if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
// Perform cleanup
pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName()))
if err := crd.cleanupResources(jobEvent.Job); err != nil {
pkg.Logger.Info("[crd] [manage-finalizer] cleanup failed", zap.Error(err))
return
}
// Remove finalizer
controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName)
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
pkg.Logger.Info("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err))
return
}
pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName()))
}
return
}
// Add finalizer if not present
if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
controllerutil.AddFinalizer(jobEvent.Job, finalizerName)
pkg.Logger.Debug("[finalizer] adding job")
// Update the resource to add the finalizer
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
pkg.Logger.Info("[finalizer] failed to add", zap.Error(err))
return
}
}
}()
func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
for j := range jobEventObservable.Observe() {
jobEvent := j.V.(*FlinkJobCrdEvent)
//pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName()))
controllerutil.AddFinalizer(jobEvent.Job, "")
}
}
func (crd Crd) cleanupResources(job *v1alpha1.FlinkJob) error {
FinalizerChannel <- job.GetUID()
return nil
}

View File

@ -2,59 +2,44 @@ package crd
import (
"flink-kube-operator/internal/crd/v1alpha1"
"os"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
type Crd struct {
client dynamic.ResourceInterface
runtimeClient client.Client
client dynamic.NamespaceableResourceInterface
}
func New() *Crd {
// Get Kubernetes config_
config_, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
// Get Kubernetes config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
config_, err = rest.InClusterConfig()
config, err = rest.InClusterConfig()
if err != nil {
panic(err)
}
}
// Create dynamic client
dynamicClient, err := dynamic.NewForConfig(config_)
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
scheme := runtime.NewScheme()
v1alpha1.AddKnownTypes(scheme)
// Get FlinkJob resource interface
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR).Namespace(os.Getenv("NAMESPACE"))
runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{
Scheme: scheme,
})
if err != nil {
panic(err)
}
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR)
crd := Crd{
client: flinkJobClient,
runtimeClient: runtimeClient,
client: flinkJobClient,
}
jobEventCh := make(chan FlinkJobCrdEvent)
// add finalizer to new resources
go crd.manageFinalizer(jobEventCh)
// Watch for FlinkJob creation
crd.watchFlinkJobs(jobEventCh)
jobEventObservable := crd.watchFlinkJobs()
// add finalizer to new resources
go crd.manageFinalizer(jobEventObservable)
return &crd
}

View File

@ -14,7 +14,6 @@ import (
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
job := GetJob(jobUid)
// pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
patchBytes, err := json.Marshal(patchData)
if err != nil {
@ -23,6 +22,7 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error
// Patch the status sub-resource
unstructuredJob, err := crd.client.
Namespace(job.GetNamespace()).
Patch(
context.Background(),
job.GetName(),

View File

@ -14,10 +14,6 @@ func (crd *Crd) repsert(job *v1alpha1.FlinkJob) {
jobs.Store(job.GetUID(), job)
}
func (crd *Crd) remove(uid types.UID) {
jobs.Delete(uid)
}
func GetJob(uid types.UID) v1alpha1.FlinkJob {
job, _ := jobs.Load(uid)
return *job.DeepCopy()

View File

@ -10,15 +10,12 @@ import (
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct {
Key string `json:"key"`
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
Key string `json:"key"`
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
}
type FlinkJobStatus struct {
@ -57,7 +54,6 @@ var (
ErrNoJarId = errors.New("[managed-job] no jar id")
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
)
type JobStatus string

View File

@ -20,11 +20,11 @@ var FlinkJobGVR = schema.GroupVersionResource{
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(AddKnownTypes)
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
func AddKnownTypes(scheme *runtime.Scheme) error {
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&FlinkJob{},
&FlinkJobList{},

View File

@ -3,10 +3,10 @@ package crd
import (
"context"
"flink-kube-operator/internal/crd/v1alpha1"
"os"
"flink-kube-operator/pkg"
"github.com/reactivex/rxgo/v2"
"go.uber.org/zap"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -14,57 +14,52 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
func (crd Crd) watchFlinkJobs(ch chan FlinkJobCrdEvent) {
func (crd Crd) watchFlinkJobs() rxgo.Observable {
ch := make(chan rxgo.Item)
go func() {
for {
pkg.Logger.Debug("[crd] starting watch")
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
if err != nil {
panic(err)
}
namespace := os.Getenv("NAMESPACE")
pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace))
for event := range watcher.ResultChan() {
unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
if err != nil {
pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err))
continue
}
job := &v1alpha1.FlinkJob{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
if err != nil {
pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err))
continue
}
if job.Namespace != namespace {
continue
}
go func() {
ch <- FlinkJobCrdEvent{
EventType: event.Type,
Job: job,
}
}()
pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type)))
switch event.Type {
case watch.Bookmark:
case watch.Modified:
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
crd.repsert(job)
case watch.Added:
//pkg.Logger.Info("[crd] [watch] new flink job created")
crd.repsert(job)
case watch.Deleted:
crd.remove(job.UID)
}
}
defer watcher.Stop()
pkg.Logger.Warn("[crd] [watch] Watcher stopped, restarting...")
pkg.Logger.Debug("[crd] starting watch")
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
if err != nil {
panic(err)
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
if err != nil {
pkg.Logger.Error("cannot create unstructured map", zap.Error(err))
continue
}
job := &v1alpha1.FlinkJob{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
if err != nil {
pkg.Logger.Error("cannot convert unstructured to structured", zap.Error(err))
continue
}
ch <- rxgo.Item{
V: &FlinkJobCrdEvent{
EventType: event.Type,
Job: job,
},
}
switch event.Type {
case watch.Bookmark:
case watch.Modified:
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
crd.repsert(job)
case watch.Added:
pkg.Logger.Info("[crd] [watch] new flink job created")
crd.repsert(job)
case watch.Deleted:
}
}
}()
return rxgo.FromChannel(ch)
}

View File

@ -1,34 +1,27 @@
package jar
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"net/http"
"net/http/cookiejar"
"os"
"strings"
"flink-kube-operator/pkg"
api "github.com/logi-camp/go-flink-client"
gonanoid "github.com/matoous/go-nanoid/v2"
"go.uber.org/zap"
)
type JarFile struct {
uri string
filePath string
basicAuthUsername *string
basicAuthPassword *string
uri string
filePath string
}
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
func NewJarFile(URI string) (*JarFile, error) {
jarFile := &JarFile{
uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
uri: URI,
}
err := jarFile.Download()
if err != nil {
@ -53,9 +46,7 @@ func (jarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err er
}
func (jarFile *JarFile) Download() error {
randBytes := make([]byte, 16)
rand.Read(randBytes)
fileName := hex.EncodeToString(randBytes)
fileName, _ := gonanoid.New()
jarFile.filePath = "/tmp/" + fileName + ".jar"
out, err := os.Create(jarFile.filePath)
if err != nil {
@ -63,45 +54,9 @@ func (jarFile *JarFile) Download() error {
}
defer out.Close()
var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
resp, err := http.Get(jarFile.uri)
if err != nil || resp.StatusCode > 299 {
jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}

View File

@ -10,11 +10,16 @@ import (
)
func (job *ManagedJob) Cycle() {
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
// Init job
if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
job.Run(false)
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" {
job.run(false)
return
}
if job.def.Status.JobStatus == v1alpha1.JobStatusFinished && job.def.Status.LifeCycleStatus == v1alpha1.LifeCycleStatusGracefullyPaused {
job.run(true)
return
}
@ -36,12 +41,6 @@ func (job *ManagedJob) Cycle() {
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
return
}
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
job.crd.SetJobStatus(job.def.UID, job.def.Status)
return
}
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
// //job.restore()
// return
@ -51,5 +50,5 @@ func (job *ManagedJob) Cycle() {
// return
// }
pkg.Logger.Warn("[managed-job] [cycle] unhandled job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus)), zap.String("namespace", job.def.Namespace))
pkg.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus)))
}

View File

@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
)
func (job *ManagedJob) Pause() error {
func (job *ManagedJob) pause() error {
var err error
if job.def.Status.JobId != nil {
result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
@ -33,7 +33,7 @@ func (job *ManagedJob) Pause() error {
if savepointPath != "" {
job.def.Status.LastSavepointPath = &savepointPath
job.def.Status.PauseSavepointTriggerId = nil
job.def.Status.JobStatus = "FINISHED"
job.def.Status.JobStatus = ""
job.def.Status.LastSavepointPath = &savepointPath
lastSavepointDate := time.Now()
job.def.Status.LastSavepointDate = &lastSavepointDate

View File

@ -1,11 +1,5 @@
package managed_job
import "errors"
func (job *ManagedJob) Stop() error {
if job.def.Status.JobId != nil {
return job.client.StopJob(*job.def.Status.JobId)
} else {
return errors.New("job Id not found")
}
func (job *ManagedJob) Stop() {
job.client.StopJob(*job.def.Status.JobId)
}

View File

@ -11,8 +11,8 @@ import (
"go.uber.org/zap"
)
// Run the job from savepoint and jarId in managedJob
func (job *ManagedJob) Run(restoreMode bool) error {
// run the job from savepoint and jarId in managedJob
func (job *ManagedJob) run(restoreMode bool) error {
var savepointPath string
if job.def.Status.LastSavepointPath == nil {
pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
@ -42,25 +42,16 @@ func (job *ManagedJob) Run(restoreMode bool) error {
AllowNonRestoredState: true,
EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism,
ProgramArg: job.def.Spec.Args,
})
if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
jobId = &runJarResp.JobId
break
} else {
if strings.Contains(err.Error(), ".jar does not exist") {
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
if strings.ContainsAny(err.Error(), ".jar does not exist") {
shouldUpload = true
} else {
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
stringErr := err.Error()
job.def.Status.Error = &stringErr
job.def.Status.JobStatus = ""
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
job.crd.SetJobStatus(job.def.UID, job.def.Status)
return v1alpha1.ErrOnStartingJob
}
}
}
@ -75,9 +66,9 @@ func (job *ManagedJob) Run(restoreMode bool) error {
})
return nil
}
shouldUpload = false
continue
}
return nil
}
// job.def.Status.JobId = &runJarResp.JobId

View File

@ -17,14 +17,14 @@ func (job ManagedJob) createSavepoint() error {
pkg.Logger.Debug("[managed-job] [savepoint] no job id")
return v1alpha1.ErrNoJobId
}
pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("name", job.def.GetName()), zap.String("interval", job.def.Spec.SavepointInterval.String()))
pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String()))
resp, err := job.client.SavePoints(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
if err != nil {
pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
return err
}
pkg.Logger.Debug("[managed-job] [savepoint] savepoint created successfully", zap.String("trigger-id", resp.RequestID))
job.def.Status.SavepointTriggerId = &resp.RequestID
job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"savepointTriggerId": resp.RequestID,
@ -80,16 +80,3 @@ func (job ManagedJob) trackSavepoint() error {
return nil
}
func (job ManagedJob) TriggerSavepoint() error {
err := job.createSavepoint()
if err != nil {
return err
}
err = job.trackSavepoint()
return err
}
func (job ManagedJob) GetLastSavepointPath() *string {
return job.def.Status.LastSavepointPath
}

View File

@ -18,7 +18,7 @@ func (job *ManagedJob) upgrade() {
"jarId": job.def.Status.JarId,
},
})
err := job.Pause()
err := job.pause()
if err != nil {
pkg.Logger.Error("[managed-job] [upgrade] error in pausing", zap.Error(err))
return
@ -30,7 +30,7 @@ func (job *ManagedJob) upgrade() {
zap.Error(err),
)
err = job.Run(true)
err = job.run(true)
if err != nil {
pkg.Logger.Error("[managed-job] [upgrade] error in running", zap.Error(err))
return

View File

@ -2,6 +2,7 @@ package managed_job
import (
"flink-kube-operator/internal/jar"
"flink-kube-operator/pkg"
"go.uber.org/zap"
@ -9,7 +10,7 @@ import (
// upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err
@ -29,16 +30,3 @@ func (job *ManagedJob) upload() error {
})
return nil
}
func (job *ManagedJob) RemoveJar() {
if job.def.Status.JarId != nil {
err := job.client.DeleteJar(*job.def.Status.JarId)
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
err = job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jarId": nil,
},
})
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
}
}

View File

@ -1,8 +1,9 @@
package managed_job
package manager
import (
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/managed_job"
"time"
"flink-kube-operator/pkg"
@ -15,22 +16,16 @@ import (
type Manager struct {
client *api.Client
managedJobs map[types.UID]ManagedJob
managedJobs map[types.UID]managed_job.ManagedJob
processingJobsIds []types.UID
}
var mgr Manager
func GetManager() Manager {
return mgr
}
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
mgr = Manager{
mgr := Manager{
client: client,
managedJobs: map[types.UID]ManagedJob{},
managedJobs: map[types.UID]managed_job.ManagedJob{},
processingJobsIds: []types.UID{},
}
@ -46,18 +41,6 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
}
}
}()
go func() {
for event := range crd.FinalizerChannel {
manager := mgr.GetJob(event)
if manager != nil {
err := manager.Stop()
pkg.Logger.Info("[finalizer]", zap.Error(err))
delete(mgr.managedJobs, event)
}
}
}()
return mgr
}
@ -71,7 +54,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
},
})
}
pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobManagerJobOverviews))
//pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews))
// Loop over job definitions as Kubernetes CRD
for _, uid := range crd.GetAllJobKeys() {
@ -85,26 +68,26 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
mgr.processingJobsIds = append(mgr.processingJobsIds, uid)
// Check if job exists in manager managed jobs
managedJob, jobFound := mgr.managedJobs[uid]
if jobFound {
managedJob, ok := mgr.managedJobs[uid]
if ok {
managedJob.Update(def)
} else {
// Add job to manager managed job
managedJob = *NewManagedJob(client, def, crdInstance)
managedJob = *managed_job.NewManagedJob(client, def, crdInstance)
}
if jobManagerJobStatusError != nil {
}
jobManagerJobOverview, jobFound := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
jobId := managedJob.GetJobId()
if jobId != nil {
return job.ID == *jobId
}
return false
})
if jobFound {
// pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
if ok {
pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
patchStatusObj := map[string]interface{}{
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
}
@ -116,16 +99,6 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
crdInstance.Patch(uid, map[string]interface{}{
"status": patchStatusObj,
})
} else {
// TODO handle job not found status
// patchStatusObj := map[string]interface{}{
// "jobStatus": "",
// "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
// }
// crdInstance.Patch(uid, map[string]interface{}{
// "status": patchStatusObj,
// })
}
managedJob.Cycle()
@ -136,12 +109,3 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
})
}
}
func (mgr *Manager) GetJobs() map[types.UID]ManagedJob {
return mgr.managedJobs
}
func (mgr *Manager) GetJob(id types.UID) *ManagedJob {
job := mgr.managedJobs[id]
return &job
}

View File

@ -1,46 +0,0 @@
package rest
import (
"fmt"
"log"
"github.com/danielgtaylor/huma/v2"
humaFiber "github.com/danielgtaylor/huma/v2/adapters/humafiber"
"github.com/gofiber/fiber/v2"
)
func Init() {
app := fiber.New()
app.Use(func(c *fiber.Ctx) error {
// Logic to execute before the next handler
fmt.Printf("Request Method: %s, URL: %s\n", c.Method(), c.OriginalURL())
// Call the next handler in the stack
err := c.Next()
// Logic to execute after the next handler
fmt.Println("Request completed")
return err
})
config := huma.DefaultConfig("Go API", "1.0.0")
config.Servers = []*huma.Server{{}}
config.Components.SecuritySchemes = map[string]*huma.SecurityScheme{
"auth": {
Type: "http",
Scheme: "bearer",
BearerFormat: "JWT",
},
}
api := humaFiber.New(app, config)
api.UseMiddleware(
func(ctx huma.Context, next func(huma.Context)) {
ctx = huma.WithValue(ctx, "humaContext", ctx)
next(ctx)
},
)
initRouter(api)
log.Fatal(app.Listen(fmt.Sprintf(":%s", "3000")))
}

View File

@ -1,253 +0,0 @@
package controller
import (
"archive/zip"
"bufio"
"context"
"errors"
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/managed_job"
"flink-kube-operator/internal/rest/types"
"flink-kube-operator/pkg"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/danielgtaylor/huma/v2"
"go.uber.org/zap"
k8sTypes "k8s.io/apimachinery/pkg/types"
)
type GetJobsReq struct {
}
type GetJobsResp struct {
Body []v1alpha1.FlinkJob
}
func GetJobs(ctx context.Context, req *GetJobsReq) (*GetJobsResp, error) {
jobs := []v1alpha1.FlinkJob{}
for _, key := range crd.GetAllJobKeys() {
job := crd.GetJob(key)
job.ManagedFields = nil
jobs = append(jobs, job)
}
return &GetJobsResp{Body: jobs}, nil
}
type StopJobReq struct {
JobUId string `path:"uid"`
}
type StopJobRespBody struct {
Success bool `json:"success"`
}
type StopJobResp struct {
Body StopJobRespBody
}
func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Stop()
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(false)
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.Run(true)
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
job.RemoveJar()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
job.Pause()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
type JobTriggerSavepointReq struct {
JobUId string `path:"uid"`
}
type JobTriggerSavepointRespBody struct {
Success bool `json:"success"`
}
type JobTriggerSavepointResp struct {
Body JobTriggerSavepointRespBody
}
func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
err := job.TriggerSavepoint()
if err != nil {
return nil, err
}
return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{
Success: true,
}}, nil
}
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
mgr := managed_job.GetManager()
job := mgr.GetJob(k8sTypes.UID(req.JobUID))
lastSavepointPath := job.GetLastSavepointPath()
if lastSavepointPath == nil {
return nil, huma.Error404NotFound("there is no savepoint path is registered for the job")
}
folderPath := strings.TrimLeft(*lastSavepointPath, "file:") // Change this to your folder path
// Create a temporary zip file
zipFilePath, err := filepath.Abs("./savepoint.zip")
pkg.Logger.Debug("[controller] [savepoint]",
zap.String("zipFileName", zipFilePath),
zap.String("folderPath", folderPath),
)
// Create the zip file
zipFile, err := os.Create(zipFilePath)
if err != nil {
fmt.Println("Error creating zip file:", err)
return nil, nil
}
defer zipFile.Close()
// Create a new zip writer
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()
// Walk through the source directory and add files to the zip
err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Create a new file header
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
// Set the header name to the relative path
header.Name, err = filepath.Rel(folderPath, filePath)
if err != nil {
return err
}
// If it's a directory, add a trailing slash
if info.IsDir() {
header.Name += "/"
} else {
// Set the compression method
header.Method = zip.Deflate
}
// Create a new writer for the file
writer, err := zipWriter.CreateHeader(header)
if err != nil {
return err
}
// If it's a directory, we're done
if info.IsDir() {
return nil
}
// Open the file to be zipped
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// Copy the file content to the zip writer
_, err = io.Copy(writer, file)
return err
})
// Open the zip file for reading
zipFileReader, err := os.Open(zipFilePath)
if err != nil {
return nil, fmt.Errorf("failed to open zip file: %w", err)
}
//defer zipFileReader.Close()
fileInfo, err := zipFileReader.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get info of zipped file: %w", err)
}
resp := &huma.StreamResponse{
Body: func(ctx huma.Context) {
ctx.SetHeader("Content-Type", "application/zip")
ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath))
writer := ctx.BodyWriter()
br := bufio.NewReader(zipFileReader)
for {
b, err := br.ReadByte()
if err != nil && !errors.Is(err, io.EOF) {
fmt.Println(err)
break
}
// process the one byte b
if err != nil {
// end of file
break
}
writer.Write([]byte{b})
}
os.Remove(zipFilePath)
},
}
return resp, nil
}

View File

@ -1,82 +0,0 @@
package rest
import (
"flink-kube-operator/internal/rest/controller"
"net/http"
"github.com/danielgtaylor/huma/v2"
)
func initRouter(api huma.API) {
huma.Register(api, huma.Operation{
OperationID: "get-jobs",
Method: http.MethodGet,
Path: "/jobs",
Summary: "Get Jobs",
Description: "Get Flink Jobs",
Tags: []string{"Job"},
}, controller.GetJobs)
huma.Register(api, huma.Operation{
OperationID: "stop-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/stop",
Summary: "Stop Job",
Description: "Stop Flink Job",
Tags: []string{"Job"},
}, controller.StopJob)
huma.Register(api, huma.Operation{
OperationID: "start-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/start",
Summary: "Start Job",
Description: "Start Flink Job",
Tags: []string{"Job"},
}, controller.StartJob)
huma.Register(api, huma.Operation{
OperationID: "resume-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/resume",
Summary: "Resume Job",
Description: "Resume Flink Job",
Tags: []string{"Job"},
}, controller.ResumeJob)
huma.Register(api, huma.Operation{
OperationID: "remove-jar",
Method: http.MethodPost,
Path: "/jobs/{uid}/remove-jar",
Summary: "Remove Job Jar",
Description: "Remove Flink Job Jar",
Tags: []string{"Job"},
}, controller.RemoveJobJar)
huma.Register(api, huma.Operation{
OperationID: "pause-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/pause",
Summary: "Pause Job",
Description: "Pause Flink Job",
Tags: []string{"Job"},
}, controller.PauseJob)
huma.Register(api, huma.Operation{
OperationID: "download-savepoint",
Method: http.MethodGet,
Path: "/savepoint/download",
Summary: "Download Savepoint",
Description: "Download Savepoint",
Tags: []string{"Savepoint"},
}, controller.DownloadSavepoint)
huma.Register(api, huma.Operation{
OperationID: "trigger-savepoint",
Method: http.MethodPost,
Path: "/jobs/{uid}/trigger-savepoint",
Summary: "Trigger Savepoint",
Description: "Trigger Savepoint",
Tags: []string{"Savepoint"},
}, controller.TriggerSavepoint)
}

View File

@ -1,12 +0,0 @@
package types
type SavepointDownloadReq struct {
JobUID string `query:"jobUID"`
}
type SavepointDownloadResp struct {
ContentType string `header:"Content-Type"`
ContentDisposition string `header:"Content-Disposition"`
ContentLength string `header:"Content-Length"`
Body []byte
}

View File

@ -2,8 +2,6 @@ package pkg
import (
"context"
"os"
"strconv"
"github.com/mattn/go-colorable"
"go.uber.org/zap"
@ -84,10 +82,8 @@ func OverrideLoggerConfig(config LoggerConfig) {
Logger = createOrUpdateInstance(config)
}
var level, err = strconv.Atoi(os.Getenv("LOG_LEVEL"))
var Logger = GetLogger(context.Background(), LoggerConfig{
Level: zapcore.Level(level),
Level: zap.DebugLevel,
Filename: "./tmp/error.log",
MaxSize: 100,
MaxAge: 90,