Compare commits
62 Commits
feature/ku
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 5ca1c28b33 | |||
| d73292ac54 | |||
| f0df5ff937 | |||
| 83c4b5ded2 | |||
| 89647f3b5b | |||
| dedbe00fba | |||
| 62c340bc64 | |||
| 44ff3627fc | |||
| 392004d99a | |||
| 22c7d712f4 | |||
| 2dd625ec7c | |||
| c991215a9d | |||
| 1c32bfbbe0 | |||
| f210090dff | |||
| 54008669cb | |||
| 830e265162 | |||
| 7f78faeed7 | |||
| f2b627cee2 | |||
| 4d6b06efe7 | |||
| 6f91ad607f | |||
| b33dc0ba1d | |||
| 556d9ff6af | |||
| 346f69100c | |||
| 75d0557286 | |||
| 012c525915 | |||
| 550b6882e1 | |||
| 55dbe9f8c2 | |||
| 1ff69e086f | |||
| e60b96cac7 | |||
| 222d70125c | |||
| 9629e70ed7 | |||
| e32addcea5 | |||
| 625d5056e6 | |||
| 6591748d16 | |||
| 8f3c32ed95 | |||
| 4bbb027c41 | |||
| 896d45e15a | |||
| 322877ad8e | |||
| 5066dc650f | |||
| c977c8a15d | |||
| ef7b16af68 | |||
| 14aba80181 | |||
| 4cd00f25f0 | |||
| 0df874b222 | |||
| 4dd82c6380 | |||
| e4f756666d | |||
| 0bc45845dc | |||
| b8e051911e | |||
| b6e2838756 | |||
| 8fa99f4ea8 | |||
| 23a4de92e4 | |||
| 37936c8c58 | |||
| 4ed533f284 | |||
| 00030195c8 | |||
| 7e33fd6cef | |||
| 5bc047dbd1 | |||
| 07b8a36e63 | |||
| 5e3f093f08 | |||
| 03fe9910a3 | |||
| 438296ec35 | |||
| 6a475c7755 | |||
| a3a806a54f |
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@ -10,7 +10,7 @@
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"env": {
|
||||
"FLINK_API_URL": "127.0.0.1:8081",
|
||||
"FLINK_API_URL": "flink.bz2:8081",
|
||||
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
|
||||
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@ -8,12 +8,13 @@
|
||||
"flink",
|
||||
"gitea",
|
||||
"gonanoid",
|
||||
"huma",
|
||||
"logicamp",
|
||||
"Namespaceable",
|
||||
"nindent",
|
||||
"reactivex",
|
||||
"repsert",
|
||||
"rxgo",
|
||||
"taskmanager",
|
||||
"tolerations"
|
||||
]
|
||||
}
|
||||
@ -1,8 +1,8 @@
|
||||
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build
|
||||
FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
|
||||
|
||||
ARG upx_version=4.2.4
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \
|
||||
curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \
|
||||
cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \
|
||||
chmod +x /usr/local/bin/upx && \
|
||||
@ -20,13 +20,14 @@ COPY . .
|
||||
|
||||
# Build
|
||||
ENV GOCACHE=/root/.cache/go-build
|
||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator
|
||||
RUN upx -q -5 /flink-kube-operator
|
||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator \
|
||||
&& upx -q -9 /flink-kube-operator
|
||||
|
||||
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
|
||||
|
||||
|
||||
COPY --from=build /flink-kube-operator /flink-kube-operator
|
||||
COPY --from=build /etc/ssl/certs /etc/ssl/certs
|
||||
|
||||
EXPOSE 8083
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17
|
||||
FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17
|
||||
|
||||
# Set working directory
|
||||
WORKDIR /opt/flink
|
||||
@ -15,8 +15,19 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh
|
||||
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
|
||||
|
||||
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
||||
CMD ["bin/start-cluster.sh"]
|
||||
6
README.md
Normal file
6
README.md
Normal file
@ -0,0 +1,6 @@
|
||||
Installation:
|
||||
|
||||
```bash
|
||||
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
|
||||
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
|
||||
```
|
||||
@ -2,7 +2,8 @@ package main
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/crd"
|
||||
"flink-kube-operator/internal/manager"
|
||||
"flink-kube-operator/internal/managed_job"
|
||||
"flink-kube-operator/internal/rest"
|
||||
"flink-kube-operator/pkg"
|
||||
"log"
|
||||
"os"
|
||||
@ -30,7 +31,7 @@ func main() {
|
||||
pkg.Logger.Info("[main]", zap.Any("cluster-config", clusterConfig))
|
||||
|
||||
// init flink job manager
|
||||
manager.NewManager(c, crdInstance)
|
||||
managed_job.NewManager(c, crdInstance)
|
||||
|
||||
// for _, jobDef := range config.Jobs {
|
||||
// managed_job.NewManagedJob(c, db, jobDef)
|
||||
@ -48,6 +49,8 @@ func main() {
|
||||
// // pkg.Logger.Debug("[main]", zap.Any("job", job))
|
||||
// }
|
||||
|
||||
go rest.Init()
|
||||
|
||||
cancelChan := make(chan os.Signal, 1)
|
||||
sig := <-cancelChan
|
||||
log.Printf("Caught signal %v", sig)
|
||||
|
||||
@ -36,6 +36,14 @@ spec:
|
||||
type: integer
|
||||
jarUri:
|
||||
type: string
|
||||
jarURIBasicAuthUsername:
|
||||
type: string
|
||||
jarURIBasicAuthPassword:
|
||||
type: string
|
||||
args:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
savepointInterval:
|
||||
type: string
|
||||
format: duration
|
||||
|
||||
@ -3,13 +3,14 @@ apiVersion: flink.logicamp.tech/v1alpha1
|
||||
kind: FlinkJob
|
||||
metadata:
|
||||
name: my-flink-job
|
||||
namespace: default
|
||||
spec:
|
||||
key: word-count
|
||||
name: "Word Count Example"
|
||||
entryClass: "org.apache.flink.examples.java.wordcount.WordCount"
|
||||
parallelism: 2
|
||||
jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar"
|
||||
entryClass: "tech.logicamp.logiline.FacilityEnrichment"
|
||||
parallelism: 1
|
||||
jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar"
|
||||
jarURIBasicAuthUsername: logiline-actrunner
|
||||
jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong
|
||||
flinkConfiguration:
|
||||
taskmanager.numberOfTaskSlots: "2"
|
||||
parallelism.default: "2"
|
||||
taskmanager.numberOfTaskSlots: "1"
|
||||
parallelism.default: "1"
|
||||
19
go.mod
19
go.mod
@ -3,8 +3,9 @@ module flink-kube-operator
|
||||
go 1.23.2
|
||||
|
||||
require (
|
||||
github.com/danielgtaylor/huma/v2 v2.27.0
|
||||
github.com/gofiber/fiber/v2 v2.52.6
|
||||
github.com/logi-camp/go-flink-client v0.2.0
|
||||
github.com/matoous/go-nanoid/v2 v2.1.0
|
||||
github.com/samber/lo v1.47.0
|
||||
go.uber.org/zap v1.27.0
|
||||
k8s.io/apimachinery v0.31.3
|
||||
@ -12,6 +13,7 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
@ -21,16 +23,20 @@ require (
|
||||
github.com/google/gnostic-models v0.6.9 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.17.11 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.16 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.58.0 // indirect
|
||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
|
||||
google.golang.org/protobuf v1.35.1 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/emirpasic/gods v1.12.0 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
@ -43,17 +49,12 @@ require (
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/reactivex/rxgo/v2 v2.5.0
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/stretchr/testify v1.9.0 // indirect
|
||||
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/net v0.31.0 // indirect
|
||||
golang.org/x/oauth2 v0.21.0 // indirect
|
||||
golang.org/x/sys v0.27.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/term v0.26.0 // indirect
|
||||
golang.org/x/text v0.20.0 // indirect
|
||||
golang.org/x/time v0.6.0 // indirect
|
||||
|
||||
51
go.sum
51
go.sum
@ -1,15 +1,14 @@
|
||||
github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/danielgtaylor/huma/v2 v2.27.0 h1:yxgJ8GqYqKeXw/EnQ4ZNc2NBpmn49AlhxL2+ksSXjUI=
|
||||
github.com/danielgtaylor/huma/v2 v2.27.0/go.mod h1:NbSFXRoOMh3BVmiLJQ9EbUpnPas7D9BeOxF/pZBAGa0=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
|
||||
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
|
||||
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
|
||||
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
|
||||
@ -28,6 +27,8 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
|
||||
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI=
|
||||
github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
@ -52,7 +53,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
@ -62,15 +64,17 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
|
||||
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
|
||||
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE=
|
||||
github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
|
||||
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
@ -87,8 +91,9 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/reactivex/rxgo/v2 v2.5.0 h1:FhPgHwX9vKdNQB2gq9EPt+EKk9QrrzoeztGbEEnZam4=
|
||||
github.com/reactivex/rxgo/v2 v2.5.0/go.mod h1:bs4fVZxcb5ZckLIOeIeVH942yunJLWDABWGbrHAW+qU=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
|
||||
@ -98,22 +103,24 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 h1:BLNsFR8l/hj/oGjnJXkd4Vi3s4kQD3/3x8HSAE4bzN0=
|
||||
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775/go.mod h1:XUZ4x3oGhWfiOnUvTslnKKs39AWUct3g3yJvXTQSJOQ=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.58.0 h1:GGB2dWxSbEprU9j0iMJHgdKYJVDyjrOwF9RE59PbRuE=
|
||||
github.com/valyala/fasthttp v1.58.0/go.mod h1:SYXvHHaFp7QZHGKSHmoMipInhrI5StHrhDTYVEjK/Kw=
|
||||
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
|
||||
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
|
||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
|
||||
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
@ -125,10 +132,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o=
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@ -140,15 +145,13 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
|
||||
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU=
|
||||
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@ -158,8 +161,6 @@ golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
|
||||
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
|
||||
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
@ -172,7 +173,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
|
||||
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
|
||||
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
|
||||
@ -181,7 +181,6 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@ -1,24 +0,0 @@
|
||||
apiVersion: v2
|
||||
name: flink-kube-operator
|
||||
description: A Helm chart for Kubernetes
|
||||
|
||||
# A chart can be either an 'application' or a 'library' chart.
|
||||
#
|
||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
||||
# to be deployed.
|
||||
#
|
||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
||||
type: application
|
||||
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.1.0
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "1.16.0"
|
||||
6
helm/chart/Chart.lock
Normal file
6
helm/chart/Chart.lock
Normal file
@ -0,0 +1,6 @@
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3
|
||||
generated: "2025-04-04T14:42:09.771390014+03:30"
|
||||
6
helm/chart/Chart.yaml
Normal file
6
helm/chart/Chart.yaml
Normal file
@ -0,0 +1,6 @@
|
||||
apiVersion: v2
|
||||
name: flink-kube-operator
|
||||
description: Helm chart for flink kube operator
|
||||
type: application
|
||||
version: 1.2.3
|
||||
appVersion: "0.1.1"
|
||||
@ -17,6 +17,6 @@
|
||||
{{- else if contains "ClusterIP" .Values.service.type }}
|
||||
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
|
||||
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
|
||||
echo "Visit http://127.0.0.1:8080 to use your application"
|
||||
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
|
||||
echo "Visit http://127.0.0.1:8081 to use your application"
|
||||
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT
|
||||
{{- end }}
|
||||
12
helm/chart/templates/flink/checkpoint-pvc.yaml
Normal file
12
helm/chart/templates/flink/checkpoint-pvc.yaml
Normal file
@ -0,0 +1,12 @@
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-checkpoint-pvc
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteMany
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.checkpoint.size }}
|
||||
{{- end }}
|
||||
43
helm/chart/templates/flink/config.yaml
Normal file
43
helm/chart/templates/flink/config.yaml
Normal file
@ -0,0 +1,43 @@
|
||||
{{- define "flink.env" -}}
|
||||
- name: JOB_MANAGER_RPC_ADDRESS
|
||||
value: "localhost"
|
||||
- name: NAMESPACE
|
||||
value: {{ .Release.Namespace }}
|
||||
- name: FLINK_PROPERTIES
|
||||
value: |
|
||||
jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager
|
||||
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
|
||||
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
|
||||
taskmanager.data.port: 6125
|
||||
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
||||
parallelism.default: {{ .Values.flink.parallelism.default }}
|
||||
state.backend: {{ .Values.flink.state.backend }}
|
||||
rest.port: 8081
|
||||
rootLogger.level = DEBUG
|
||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||
high-availability.type: kubernetes
|
||||
kubernetes.namespace: {{ .Release.Namespace }}
|
||||
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
|
||||
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
|
||||
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
state.checkpoints.dir: file:///opt/flink/checkpoints/
|
||||
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
|
||||
state.checkpoints.dir: s3://flink/checkpoints/
|
||||
{{- end }}
|
||||
state.backend.rocksdb.localdir: /opt/flink/rocksdb
|
||||
high-availability.storageDir: /opt/flink/ha
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
state.savepoints.dir: file:///opt/flink/savepoints/
|
||||
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
|
||||
state.savepoints.dir: s3://flink/savepoints/
|
||||
{{- end }}
|
||||
state.backend.incremental: {{ .Values.flink.state.incremental }}
|
||||
rest.profiling.enabled: true
|
||||
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
|
||||
s3.endpoint: http://{{ .Release.Name }}-minio:9000
|
||||
s3.path.style.access: true
|
||||
{{- end }}
|
||||
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
|
||||
|
||||
{{- end }}
|
||||
10
helm/chart/templates/flink/ha.pvc.yaml
Normal file
10
helm/chart/templates/flink/ha.pvc.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-ha-pvc
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.ha.size }} # Use size defined in values.yaml
|
||||
102
helm/chart/templates/flink/job-manager-deploy.yaml
Normal file
102
helm/chart/templates/flink/job-manager-deploy.yaml
Normal file
@ -0,0 +1,102 @@
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-job-manager
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
spec:
|
||||
replicas: 1
|
||||
strategy:
|
||||
type: Recreate
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
initContainers:
|
||||
- name: volume-mount-hack
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
runAsUser: 0
|
||||
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"]
|
||||
volumeMounts:
|
||||
- name: flink-ha
|
||||
mountPath: {{ .Values.flink.state.ha.dir }}
|
||||
containers:
|
||||
- name: jobmanager
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
imagePullPolicy: Always
|
||||
args: ["jobmanager"]
|
||||
ports:
|
||||
- containerPort: 6123 # JobManager RPC port
|
||||
name: rpc
|
||||
- containerPort: 6124 # JobManager blob server port
|
||||
name: blob
|
||||
- containerPort: 6125 # JobManager queryable state port
|
||||
name: query
|
||||
- containerPort: 8081 # JobManager Web UI port
|
||||
name: ui
|
||||
env:
|
||||
{{- include "flink.env" . | nindent 12 }}
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
|
||||
- name: S3_ENDPOINT
|
||||
value: "http://minio-service:9000"
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_access_key
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_secret_key
|
||||
{{- end }}
|
||||
volumeMounts:
|
||||
- name: flink-ha
|
||||
mountPath: {{ .Values.flink.state.ha.dir }}
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
mountPath: /opt/flink/checkpoints
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
mountPath: /opt/flink/savepoints
|
||||
{{- end }}
|
||||
volumes:
|
||||
- name: flink-ha
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-ha-pvc
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-savepoint-pvc
|
||||
{{- end }}
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.affinity }}
|
||||
affinity:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.tolerations }}
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
28
helm/chart/templates/flink/job-manager-service.yaml
Normal file
28
helm/chart/templates/flink/job-manager-service.yaml
Normal file
@ -0,0 +1,28 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-job-manager
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
ports:
|
||||
- name: flink-web-ui
|
||||
port: 8081
|
||||
targetPort: 8081
|
||||
- name: rpc
|
||||
port: 6123
|
||||
targetPort: 6123
|
||||
- name: blob
|
||||
port: 6124
|
||||
targetPort: 6124
|
||||
- name: query
|
||||
port: 6125
|
||||
targetPort: 6125
|
||||
- name: operator
|
||||
port: 3000
|
||||
targetPort: 3000
|
||||
selector:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||
12
helm/chart/templates/flink/savepoint-pvc.yaml
Normal file
12
helm/chart/templates/flink/savepoint-pvc.yaml
Normal file
@ -0,0 +1,12 @@
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-savepoint-pvc
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteMany
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.savepoint.size }}
|
||||
{{- end }}
|
||||
80
helm/chart/templates/flink/task-manager-statefulset.yaml
Normal file
80
helm/chart/templates/flink/task-manager-statefulset.yaml
Normal file
@ -0,0 +1,80 @@
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-task-manager
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: taskmanager
|
||||
spec:
|
||||
serviceName: {{ .Release.Name }}-flink-task-manager
|
||||
replicas: {{ .Values.flink.taskManager.replicas }}
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-task-manager
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-task-manager
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
containers:
|
||||
- name: task-manager
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
imagePullPolicy: Always
|
||||
args: ["taskmanager"]
|
||||
env:
|
||||
{{- include "flink.env" . | nindent 8 }}
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
|
||||
- name: S3_ENDPOINT
|
||||
value: "http://minio-service:9000"
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_access_key
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_secret_key
|
||||
{{- end }}
|
||||
volumeMounts:
|
||||
- name: rocksdb-storage
|
||||
mountPath: /opt/flink/rocksdb
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
mountPath: /opt/flink/checkpoints
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
mountPath: /opt/flink/savepoints
|
||||
{{- end }}
|
||||
resources:
|
||||
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
|
||||
volumes:
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-savepoint-pvc
|
||||
{{- end }}
|
||||
|
||||
volumeClaimTemplates:
|
||||
- metadata:
|
||||
name: rocksdb-storage
|
||||
spec:
|
||||
accessModes: [ ReadWriteOnce ]
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.taskManager.storage.rocksDb.size }}
|
||||
@ -2,7 +2,6 @@ apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
metadata:
|
||||
name: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
namespace: {{ .Release.Namespace }} # Namespace where the role is created
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 4 }}
|
||||
rules:
|
||||
17
helm/chart/templates/operator/service.yaml
Normal file
17
helm/chart/templates/operator/service.yaml
Normal file
@ -0,0 +1,17 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-operator
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: http
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
70
helm/chart/templates/operator/statefulset.yaml
Normal file
70
helm/chart/templates/operator/statefulset.yaml
Normal file
@ -0,0 +1,70 @@
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-operator
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
spec:
|
||||
serviceName: {{ .Release.Name }}-flink-operator
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
initContainers:
|
||||
- name: wait-for-jobmanager
|
||||
image: curlimages/curl:8.5.0 # Lightweight curl image
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
- |
|
||||
echo "Waiting for Flink JobManager to be ready..."
|
||||
until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do
|
||||
echo "JobManager not ready yet - retrying in 5s..."
|
||||
sleep 5
|
||||
done
|
||||
echo "JobManager is ready!"
|
||||
containers:
|
||||
- name: operator
|
||||
securityContext:
|
||||
{{- toYaml .Values.securityContext | nindent 12 }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: {{ .Values.service.port }}
|
||||
protocol: TCP
|
||||
env:
|
||||
- name: FLINK_API_URL
|
||||
value: {{ .Release.Name }}-flink-job-manager:8081
|
||||
- name: NAMESPACE
|
||||
value: "{{ .Release.Namespace }}"
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "s3" }}
|
||||
- name: SAVEPOINT_PATH
|
||||
value: s3://flink/savepoints/
|
||||
- name: S3_ENDPOINT
|
||||
value: "http://{{ .Release.Name }}-minio:9000"
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-minio
|
||||
key: root-user
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-minio
|
||||
key: root-password
|
||||
{{- else }}
|
||||
- name: SAVEPOINT_PATH
|
||||
value: /opt/flink/savepoints/
|
||||
{{- end }}
|
||||
|
||||
@ -38,8 +38,7 @@ podAnnotations: {}
|
||||
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
|
||||
podLabels: {}
|
||||
|
||||
podSecurityContext: {}
|
||||
# fsGroup: 2000
|
||||
podSecurityContext: {} # fsGroup: 2000
|
||||
|
||||
securityContext: {}
|
||||
# capabilities:
|
||||
@ -54,7 +53,7 @@ service:
|
||||
# This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
|
||||
type: ClusterIP
|
||||
# This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports
|
||||
port: 80
|
||||
port: 3000
|
||||
|
||||
# This block is for setting up the ingress for more information can be found here: https://kubernetes.io/docs/concepts/services-networking/ingress/
|
||||
ingress:
|
||||
@ -64,10 +63,10 @@ ingress:
|
||||
# kubernetes.io/ingress.class: nginx
|
||||
# kubernetes.io/tls-acme: "true"
|
||||
hosts:
|
||||
- host: chart-example.local
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
- host: chart-example.local
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
tls: []
|
||||
# - secretName: chart-example-tls
|
||||
# hosts:
|
||||
@ -106,7 +105,6 @@ autoscaling:
|
||||
config:
|
||||
flinkApiUrl: flink:8081
|
||||
|
||||
|
||||
nodeSelector: {}
|
||||
|
||||
tolerations: []
|
||||
@ -117,22 +115,44 @@ affinity: {}
|
||||
flink:
|
||||
image:
|
||||
repository: lcr.logicamp.tech/library/flink
|
||||
tag: 1.20.0-scala_2.12-java17-minicluster
|
||||
tag: 1.20.1-scala_2.12-java17-minicluster
|
||||
|
||||
parallelism:
|
||||
default: 1 # Default parallelism for Flink jobs
|
||||
default: 1 # Default parallelism for Flink jobs
|
||||
|
||||
state:
|
||||
backend: rocksdb # Use RocksDB for state backend
|
||||
savepoints:
|
||||
dir: "file:///opt/flink/savepoints" # Directory to store savepoints
|
||||
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
|
||||
backend: rocksdb # Use RocksDB for state backend
|
||||
incremental: true
|
||||
ha:
|
||||
dir: "/opt/flink/ha" # Directory to store ha data
|
||||
pvcName: flink-ha-pvc # PVC for ha
|
||||
size: 10Gi # PVC size for ha
|
||||
checkpoint:
|
||||
storageType: s3 # s3 / filesystem
|
||||
interval: 5min
|
||||
mode: EXACTLY_ONCE
|
||||
size: 8Gi
|
||||
savepoint:
|
||||
storageType: s3
|
||||
size: 8Gi
|
||||
|
||||
jobManager:
|
||||
processMemory: 4096m # Size of job manager process memory
|
||||
|
||||
properties:
|
||||
jobmanager.rpc.timeout: 300s
|
||||
|
||||
taskManager:
|
||||
numberOfTaskSlots: 100 # Number of task slots for TaskManager
|
||||
|
||||
persistence:
|
||||
enabled: true
|
||||
size: 10Gi # PVC size for savepoints storage
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
numberOfTaskSlots: 12 # Number of task slots for task manager
|
||||
processMemory: 4096m # Size of task manager process memory
|
||||
replicas: 1 # Number of task manager replicas
|
||||
storage:
|
||||
rocksDb:
|
||||
size: 4Gi
|
||||
resources:
|
||||
limits:
|
||||
cpu: 3
|
||||
memory: 4Gi
|
||||
requests:
|
||||
cpu: 1
|
||||
memory: 2Gi
|
||||
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.10.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.10.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.11.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.11.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.12.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.12.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.13.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.13.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.14.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.14.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.2.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.2.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.3.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.3.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.4.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.4.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.5.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.5.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.6.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.6.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.7.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.7.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.8.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.8.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.9.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.9.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.0.0.tgz
Normal file
BIN
helm/flink-kube-operator-1.0.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.0.1.tgz
Normal file
BIN
helm/flink-kube-operator-1.0.1.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.1.0.tgz
Normal file
BIN
helm/flink-kube-operator-1.1.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.1.1.tgz
Normal file
BIN
helm/flink-kube-operator-1.1.1.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.1.2.tgz
Normal file
BIN
helm/flink-kube-operator-1.1.2.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.2.0.tgz
Normal file
BIN
helm/flink-kube-operator-1.2.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.2.1.tgz
Normal file
BIN
helm/flink-kube-operator-1.2.1.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.2.2.tgz
Normal file
BIN
helm/flink-kube-operator-1.2.2.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-1.2.3.tgz
Normal file
BIN
helm/flink-kube-operator-1.2.3.tgz
Normal file
Binary file not shown.
240
helm/index.yaml
Normal file
240
helm/index.yaml
Normal file
@ -0,0 +1,240 @@
|
||||
apiVersion: v1
|
||||
entries:
|
||||
flink-kube-operator:
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-07-18T18:09:46.27166563+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 597f2c07884bb5411dcc6e1a9cdf7672977858efe30273a46fb6525eb6013091
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.3.tgz
|
||||
version: 1.2.3
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-05-17T14:34:55.317942453+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 422a34dc173ebe29adccd46d7ef94505cc022ff20ccbfb85ac3e6e201cba476c
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.2.tgz
|
||||
version: 1.2.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-05-17T14:01:29.891695937+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.1.tgz
|
||||
version: 1.2.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-05-17T12:47:25.848097207+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.0.tgz
|
||||
version: 1.2.0
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-04-15T12:06:59.425538953+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.1.2.tgz
|
||||
version: 1.1.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-04-12T23:13:39.394371646+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.0.1.tgz
|
||||
version: 1.0.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-04-06T01:52:09.478716316+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.0.0.tgz
|
||||
version: 1.0.0
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-04-04T13:50:27.971040367+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 00acef7878bcf372d036fabaac400913097d678087a756102b54a28428197bdf
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.14.tgz
|
||||
version: 0.1.14
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T23:13:19.698003661+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: d104b9242362415a7b920e4e2af975730e208ff73db17b8d2afd11ea8b78b4a2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.13.tgz
|
||||
version: 0.1.13
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T23:13:19.697555829+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: f58802990389ecde00a49a442f6e83a007e281e972d07f2979657d2763fe94ba
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.12.tgz
|
||||
version: 0.1.12
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.491747333+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 0daa98c63b443018c2072a2d7448c972faff2274fb04433c613532b408cd3ab1
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.11.tgz
|
||||
version: 0.1.11
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.490697387+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: e091256eeb8640b61443cbe4781426ef493737ab0ac1145e568426bb2c1ed3ba
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.10.tgz
|
||||
version: 0.1.10
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.495842696+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.9.tgz
|
||||
version: 0.1.9
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.495392608+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.8.tgz
|
||||
version: 0.1.8
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.494948853+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.7.tgz
|
||||
version: 0.1.7
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.49450822+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.6.tgz
|
||||
version: 0.1.6
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.494040193+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.5.tgz
|
||||
version: 0.1.5
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.493584927+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.4.tgz
|
||||
version: 0.1.4
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.493138547+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.3.tgz
|
||||
version: 0.1.3
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.492696005+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.2.tgz
|
||||
version: 0.1.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.490170385+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.1.tgz
|
||||
version: 0.1.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.489734651+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.0.tgz
|
||||
version: 0.1.0
|
||||
generated: "2025-07-18T18:09:46.244672127+03:30"
|
||||
@ -1,71 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
|
||||
containers:
|
||||
- name: flink
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
imagePullPolicy: Always
|
||||
ports:
|
||||
- containerPort: 8081 # JobManager Web UI port
|
||||
- containerPort: 6121 # TaskManager communication port
|
||||
- containerPort: 6122 # TaskManager communication port
|
||||
env:
|
||||
- name: JOB_MANAGER_RPC_ADDRESS
|
||||
value: "localhost" # JobManager and TaskManager in the same container
|
||||
- name: FLINK_PROPERTIES
|
||||
value: |
|
||||
jobmanager.rpc.address: localhost
|
||||
jobmanager.memory.process.size: 2048m
|
||||
taskmanager.memory.process.size: 2048m
|
||||
taskmanager.data.port: 6125
|
||||
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
||||
parallelism.default: {{ .Values.flink.parallelism.default }}
|
||||
state.backend: {{ .Values.flink.state.backend }}
|
||||
state.savepoints.dir: {{ .Values.flink.state.savepoints.dir }}
|
||||
rest.port: 8081
|
||||
rootLogger.level = DEBUG
|
||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||
web.upload.dir: /opt/flink/data/web-upload
|
||||
state.checkpoints.dir: file:///tmp/flink-checkpoints
|
||||
high-availability.type: kubernetes
|
||||
high-availability.storageDir: file:///opt/flink/ha
|
||||
kubernetes.cluster-id: cluster-one
|
||||
kubernetes.namespace: {{ .Release.Namespace }}
|
||||
volumeMounts:
|
||||
- name: flink-data
|
||||
mountPath: /opt/flink/data
|
||||
subPath: data
|
||||
- name: flink-data
|
||||
mountPath: /opt/flink/web-upload
|
||||
subPath: web-upload
|
||||
- name: flink-savepoints
|
||||
mountPath: /opt/flink/savepoints
|
||||
- name: flink-savepoints
|
||||
mountPath: /opt/flink/ha
|
||||
subPath: ha
|
||||
|
||||
volumes:
|
||||
- name: flink-data
|
||||
emptyDir: {} # Temporary storage for internal data
|
||||
- name: flink-savepoints
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence
|
||||
@ -1,10 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.persistence.size }} # Use size defined in values.yaml
|
||||
@ -1,15 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: flink # Adding the flink prefix to the service name
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
ports:
|
||||
- port: 8081
|
||||
targetPort: 8081
|
||||
selector:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||
@ -1,61 +0,0 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ include "flink-kube-operator.fullname" . }}
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 4 }}
|
||||
spec:
|
||||
{{- if not .Values.autoscaling.enabled }}
|
||||
replicas: {{ .Values.replicaCount }}
|
||||
{{- end }}
|
||||
selector:
|
||||
matchLabels:
|
||||
{{- include "flink-kube-operator.selectorLabels" . | nindent 6 }}
|
||||
template:
|
||||
metadata:
|
||||
{{- with .Values.podAnnotations }}
|
||||
annotations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 8 }}
|
||||
{{- with .Values.podLabels }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
spec:
|
||||
{{- with .Values.imagePullSecrets }}
|
||||
imagePullSecrets:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
securityContext:
|
||||
{{- toYaml .Values.podSecurityContext | nindent 8 }}
|
||||
containers:
|
||||
- name: {{ .Chart.Name }}
|
||||
securityContext:
|
||||
{{- toYaml .Values.securityContext | nindent 12 }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: {{ .Values.service.port }}
|
||||
protocol: TCP
|
||||
env:
|
||||
- name: FLINK_API_URL
|
||||
value: {{ .Values.config.flinkApiUrl }}
|
||||
resources:
|
||||
{{- toYaml .Values.resources | nindent 12 }}
|
||||
|
||||
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.affinity }}
|
||||
affinity:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.tolerations }}
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
@ -1,15 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ include "flink-kube-operator.fullname" . }}
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 4 }}
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: http
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
{{- include "flink-kube-operator.selectorLabels" . | nindent 4 }}
|
||||
@ -1,14 +1,61 @@
|
||||
package crd
|
||||
|
||||
import (
|
||||
"github.com/reactivex/rxgo/v2"
|
||||
"context"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
)
|
||||
|
||||
func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
|
||||
for j := range jobEventObservable.Observe() {
|
||||
jobEvent := j.V.(*FlinkJobCrdEvent)
|
||||
//pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName()))
|
||||
controllerutil.AddFinalizer(jobEvent.Job, "")
|
||||
var FinalizerChannel chan (types.UID) = make(chan (types.UID))
|
||||
|
||||
func (crd Crd) manageFinalizer(jobEventChannel chan FlinkJobCrdEvent) {
|
||||
|
||||
finalizerName := "flink-operator.logicamp.tech/finalizer"
|
||||
for jobEvent := range jobEventChannel {
|
||||
pkg.Logger.Debug("[crd] [manage-finalizer] main loop", zap.String("name", jobEvent.Job.Name))
|
||||
go func() {
|
||||
if jobEvent.Job.GetDeletionTimestamp() != nil {
|
||||
// Resource is being deleted
|
||||
if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
|
||||
// Perform cleanup
|
||||
pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName()))
|
||||
if err := crd.cleanupResources(jobEvent.Job); err != nil {
|
||||
pkg.Logger.Info("[crd] [manage-finalizer] cleanup failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// Remove finalizer
|
||||
controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName)
|
||||
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
|
||||
pkg.Logger.Info("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err))
|
||||
return
|
||||
}
|
||||
pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName()))
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Add finalizer if not present
|
||||
if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
|
||||
controllerutil.AddFinalizer(jobEvent.Job, finalizerName)
|
||||
pkg.Logger.Debug("[finalizer] adding job")
|
||||
// Update the resource to add the finalizer
|
||||
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
|
||||
pkg.Logger.Info("[finalizer] failed to add", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (crd Crd) cleanupResources(job *v1alpha1.FlinkJob) error {
|
||||
FinalizerChannel <- job.GetUID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2,44 +2,59 @@ package crd
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"os"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/config"
|
||||
)
|
||||
|
||||
type Crd struct {
|
||||
client dynamic.NamespaceableResourceInterface
|
||||
client dynamic.ResourceInterface
|
||||
runtimeClient client.Client
|
||||
}
|
||||
|
||||
func New() *Crd {
|
||||
// Get Kubernetes config
|
||||
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
|
||||
// Get Kubernetes config_
|
||||
config_, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
|
||||
if err != nil {
|
||||
config, err = rest.InClusterConfig()
|
||||
config_, err = rest.InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create dynamic client
|
||||
dynamicClient, err := dynamic.NewForConfig(config)
|
||||
dynamicClient, err := dynamic.NewForConfig(config_)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
v1alpha1.AddKnownTypes(scheme)
|
||||
// Get FlinkJob resource interface
|
||||
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR)
|
||||
|
||||
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR).Namespace(os.Getenv("NAMESPACE"))
|
||||
runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{
|
||||
Scheme: scheme,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
crd := Crd{
|
||||
client: flinkJobClient,
|
||||
client: flinkJobClient,
|
||||
runtimeClient: runtimeClient,
|
||||
}
|
||||
|
||||
// Watch for FlinkJob creation
|
||||
jobEventObservable := crd.watchFlinkJobs()
|
||||
jobEventCh := make(chan FlinkJobCrdEvent)
|
||||
|
||||
// add finalizer to new resources
|
||||
go crd.manageFinalizer(jobEventObservable)
|
||||
go crd.manageFinalizer(jobEventCh)
|
||||
|
||||
// Watch for FlinkJob creation
|
||||
crd.watchFlinkJobs(jobEventCh)
|
||||
|
||||
return &crd
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
|
||||
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||
job := GetJob(jobUid)
|
||||
// pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
|
||||
|
||||
patchBytes, err := json.Marshal(patchData)
|
||||
if err != nil {
|
||||
@ -22,7 +23,6 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error
|
||||
|
||||
// Patch the status sub-resource
|
||||
unstructuredJob, err := crd.client.
|
||||
Namespace(job.GetNamespace()).
|
||||
Patch(
|
||||
context.Background(),
|
||||
job.GetName(),
|
||||
|
||||
@ -14,6 +14,10 @@ func (crd *Crd) repsert(job *v1alpha1.FlinkJob) {
|
||||
jobs.Store(job.GetUID(), job)
|
||||
}
|
||||
|
||||
func (crd *Crd) remove(uid types.UID) {
|
||||
jobs.Delete(uid)
|
||||
}
|
||||
|
||||
func GetJob(uid types.UID) v1alpha1.FlinkJob {
|
||||
job, _ := jobs.Load(uid)
|
||||
return *job.DeepCopy()
|
||||
|
||||
@ -10,12 +10,15 @@ import (
|
||||
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
|
||||
|
||||
type FlinkJobSpec struct {
|
||||
Key string `json:"key"`
|
||||
Name string `json:"name"`
|
||||
Parallelism int `json:"parallelism"`
|
||||
JarURI string `json:"jarUri"`
|
||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||
EntryClass string `json:"entryClass"`
|
||||
Key string `json:"key"`
|
||||
Name string `json:"name"`
|
||||
Parallelism int `json:"parallelism"`
|
||||
JarURI string `json:"jarUri"`
|
||||
JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
|
||||
JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
|
||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||
EntryClass string `json:"entryClass"`
|
||||
Args []string `json:"args"`
|
||||
}
|
||||
|
||||
type FlinkJobStatus struct {
|
||||
@ -54,6 +57,7 @@ var (
|
||||
ErrNoJarId = errors.New("[managed-job] no jar id")
|
||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
@ -20,11 +20,11 @@ var FlinkJobGVR = schema.GroupVersionResource{
|
||||
}
|
||||
|
||||
var (
|
||||
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
|
||||
SchemeBuilder = runtime.NewSchemeBuilder(AddKnownTypes)
|
||||
AddToScheme = SchemeBuilder.AddToScheme
|
||||
)
|
||||
|
||||
func addKnownTypes(scheme *runtime.Scheme) error {
|
||||
func AddKnownTypes(scheme *runtime.Scheme) error {
|
||||
scheme.AddKnownTypes(SchemeGroupVersion,
|
||||
&FlinkJob{},
|
||||
&FlinkJobList{},
|
||||
|
||||
@ -3,10 +3,10 @@ package crd
|
||||
import (
|
||||
"context"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"os"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
"github.com/reactivex/rxgo/v2"
|
||||
"go.uber.org/zap"
|
||||
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@ -14,52 +14,57 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
||||
|
||||
ch := make(chan rxgo.Item)
|
||||
func (crd Crd) watchFlinkJobs(ch chan FlinkJobCrdEvent) {
|
||||
|
||||
go func() {
|
||||
pkg.Logger.Debug("[crd] starting watch")
|
||||
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
for event := range watcher.ResultChan() {
|
||||
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
|
||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||
for {
|
||||
pkg.Logger.Debug("[crd] starting watch")
|
||||
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
|
||||
if err != nil {
|
||||
pkg.Logger.Error("cannot create unstructured map", zap.Error(err))
|
||||
continue
|
||||
panic(err)
|
||||
}
|
||||
job := &v1alpha1.FlinkJob{}
|
||||
namespace := os.Getenv("NAMESPACE")
|
||||
pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace))
|
||||
for event := range watcher.ResultChan() {
|
||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
job := &v1alpha1.FlinkJob{}
|
||||
|
||||
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("cannot convert unstructured to structured", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if job.Namespace != namespace {
|
||||
continue
|
||||
}
|
||||
|
||||
ch <- rxgo.Item{
|
||||
V: &FlinkJobCrdEvent{
|
||||
EventType: event.Type,
|
||||
Job: job,
|
||||
},
|
||||
}
|
||||
switch event.Type {
|
||||
case watch.Bookmark:
|
||||
case watch.Modified:
|
||||
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
crd.repsert(job)
|
||||
case watch.Added:
|
||||
pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
crd.repsert(job)
|
||||
case watch.Deleted:
|
||||
go func() {
|
||||
ch <- FlinkJobCrdEvent{
|
||||
EventType: event.Type,
|
||||
Job: job,
|
||||
}
|
||||
}()
|
||||
pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type)))
|
||||
switch event.Type {
|
||||
case watch.Bookmark:
|
||||
case watch.Modified:
|
||||
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
crd.repsert(job)
|
||||
case watch.Added:
|
||||
//pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
crd.repsert(job)
|
||||
case watch.Deleted:
|
||||
crd.remove(job.UID)
|
||||
}
|
||||
}
|
||||
defer watcher.Stop()
|
||||
pkg.Logger.Warn("[crd] [watch] Watcher stopped, restarting...")
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return rxgo.FromChannel(ch)
|
||||
}
|
||||
|
||||
@ -1,27 +1,34 @@
|
||||
package jar
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
api "github.com/logi-camp/go-flink-client"
|
||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type JarFile struct {
|
||||
uri string
|
||||
filePath string
|
||||
uri string
|
||||
filePath string
|
||||
basicAuthUsername *string
|
||||
basicAuthPassword *string
|
||||
}
|
||||
|
||||
func NewJarFile(URI string) (*JarFile, error) {
|
||||
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
|
||||
jarFile := &JarFile{
|
||||
uri: URI,
|
||||
uri: URI,
|
||||
basicAuthUsername: basicAuthUsername,
|
||||
basicAuthPassword: basicAuthPassword,
|
||||
}
|
||||
err := jarFile.Download()
|
||||
if err != nil {
|
||||
@ -46,7 +53,9 @@ func (jarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err er
|
||||
}
|
||||
|
||||
func (jarFile *JarFile) Download() error {
|
||||
fileName, _ := gonanoid.New()
|
||||
randBytes := make([]byte, 16)
|
||||
rand.Read(randBytes)
|
||||
fileName := hex.EncodeToString(randBytes)
|
||||
jarFile.filePath = "/tmp/" + fileName + ".jar"
|
||||
out, err := os.Create(jarFile.filePath)
|
||||
if err != nil {
|
||||
@ -54,9 +63,45 @@ func (jarFile *JarFile) Download() error {
|
||||
}
|
||||
|
||||
defer out.Close()
|
||||
resp, err := http.Get(jarFile.uri)
|
||||
if err != nil || resp.StatusCode > 299 {
|
||||
|
||||
var resp *http.Response
|
||||
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
|
||||
|
||||
basicAuth := func(username, password string) string {
|
||||
auth := username + ":" + password
|
||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||
}
|
||||
|
||||
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
|
||||
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
|
||||
return nil
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Jar: &cookiejar.Jar{},
|
||||
CheckRedirect: redirectPolicyFunc,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", jarFile.uri, nil)
|
||||
if err != nil {
|
||||
jarFile.delete()
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
|
||||
resp, err = client.Do(req)
|
||||
} else {
|
||||
resp, err = http.Get(jarFile.uri)
|
||||
}
|
||||
if err != nil {
|
||||
jarFile.delete()
|
||||
pkg.Logger.Error("error in downloading jar", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
respBody := []byte{}
|
||||
resp.Body.Read(respBody)
|
||||
err = errors.New(string(respBody) + " status:" + resp.Status)
|
||||
pkg.Logger.Error("error in downloading jar", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -10,16 +10,11 @@ import (
|
||||
)
|
||||
|
||||
func (job *ManagedJob) Cycle() {
|
||||
pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
|
||||
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
|
||||
|
||||
// Init job
|
||||
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" {
|
||||
job.run(false)
|
||||
return
|
||||
}
|
||||
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusFinished && job.def.Status.LifeCycleStatus == v1alpha1.LifeCycleStatusGracefullyPaused {
|
||||
job.run(true)
|
||||
if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
|
||||
job.Run(false)
|
||||
return
|
||||
}
|
||||
|
||||
@ -41,6 +36,12 @@ func (job *ManagedJob) Cycle() {
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
||||
return
|
||||
}
|
||||
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return
|
||||
}
|
||||
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
||||
// //job.restore()
|
||||
// return
|
||||
@ -50,5 +51,5 @@ func (job *ManagedJob) Cycle() {
|
||||
// return
|
||||
// }
|
||||
|
||||
pkg.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus)))
|
||||
pkg.Logger.Warn("[managed-job] [cycle] unhandled job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus)), zap.String("namespace", job.def.Namespace))
|
||||
}
|
||||
|
||||
@ -2,7 +2,6 @@ package managed_job
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/jar"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -10,7 +9,7 @@ import (
|
||||
|
||||
// upload jar file and set the jarId for later usages
|
||||
func (job *ManagedJob) upload() error {
|
||||
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
|
||||
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
|
||||
if err != nil {
|
||||
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
|
||||
return err
|
||||
@ -30,3 +29,16 @@ func (job *ManagedJob) upload() error {
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (job *ManagedJob) RemoveJar() {
|
||||
if job.def.Status.JarId != nil {
|
||||
err := job.client.DeleteJar(*job.def.Status.JarId)
|
||||
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
|
||||
err = job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
"jarId": nil,
|
||||
},
|
||||
})
|
||||
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
|
||||
}
|
||||
}
|
||||
@ -1,9 +1,8 @@
|
||||
package manager
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/crd"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"flink-kube-operator/internal/managed_job"
|
||||
"time"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
@ -16,16 +15,22 @@ import (
|
||||
|
||||
type Manager struct {
|
||||
client *api.Client
|
||||
managedJobs map[types.UID]managed_job.ManagedJob
|
||||
managedJobs map[types.UID]ManagedJob
|
||||
processingJobsIds []types.UID
|
||||
}
|
||||
|
||||
var mgr Manager
|
||||
|
||||
func GetManager() Manager {
|
||||
return mgr
|
||||
}
|
||||
|
||||
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
quit := make(chan struct{})
|
||||
mgr := Manager{
|
||||
mgr = Manager{
|
||||
client: client,
|
||||
managedJobs: map[types.UID]managed_job.ManagedJob{},
|
||||
managedJobs: map[types.UID]ManagedJob{},
|
||||
processingJobsIds: []types.UID{},
|
||||
}
|
||||
|
||||
@ -41,6 +46,18 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for event := range crd.FinalizerChannel {
|
||||
manager := mgr.GetJob(event)
|
||||
if manager != nil {
|
||||
err := manager.Stop()
|
||||
pkg.Logger.Info("[finalizer]", zap.Error(err))
|
||||
delete(mgr.managedJobs, event)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return mgr
|
||||
}
|
||||
|
||||
@ -54,7 +71,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
},
|
||||
})
|
||||
}
|
||||
//pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews))
|
||||
pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobManagerJobOverviews))
|
||||
|
||||
// Loop over job definitions as Kubernetes CRD
|
||||
for _, uid := range crd.GetAllJobKeys() {
|
||||
@ -68,26 +85,26 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
mgr.processingJobsIds = append(mgr.processingJobsIds, uid)
|
||||
|
||||
// Check if job exists in manager managed jobs
|
||||
managedJob, ok := mgr.managedJobs[uid]
|
||||
if ok {
|
||||
managedJob, jobFound := mgr.managedJobs[uid]
|
||||
if jobFound {
|
||||
managedJob.Update(def)
|
||||
} else {
|
||||
// Add job to manager managed job
|
||||
managedJob = *managed_job.NewManagedJob(client, def, crdInstance)
|
||||
managedJob = *NewManagedJob(client, def, crdInstance)
|
||||
}
|
||||
if jobManagerJobStatusError != nil {
|
||||
|
||||
}
|
||||
|
||||
jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
|
||||
jobManagerJobOverview, jobFound := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
|
||||
jobId := managedJob.GetJobId()
|
||||
if jobId != nil {
|
||||
return job.ID == *jobId
|
||||
}
|
||||
return false
|
||||
})
|
||||
if ok {
|
||||
pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
|
||||
if jobFound {
|
||||
// pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
|
||||
patchStatusObj := map[string]interface{}{
|
||||
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
|
||||
}
|
||||
@ -99,6 +116,16 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
crdInstance.Patch(uid, map[string]interface{}{
|
||||
"status": patchStatusObj,
|
||||
})
|
||||
} else {
|
||||
// TODO handle job not found status
|
||||
// patchStatusObj := map[string]interface{}{
|
||||
// "jobStatus": "",
|
||||
// "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
|
||||
// }
|
||||
|
||||
// crdInstance.Patch(uid, map[string]interface{}{
|
||||
// "status": patchStatusObj,
|
||||
// })
|
||||
}
|
||||
|
||||
managedJob.Cycle()
|
||||
@ -109,3 +136,12 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *Manager) GetJobs() map[types.UID]ManagedJob {
|
||||
return mgr.managedJobs
|
||||
}
|
||||
|
||||
func (mgr *Manager) GetJob(id types.UID) *ManagedJob {
|
||||
job := mgr.managedJobs[id]
|
||||
return &job
|
||||
}
|
||||
@ -10,7 +10,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (job *ManagedJob) pause() error {
|
||||
func (job *ManagedJob) Pause() error {
|
||||
var err error
|
||||
if job.def.Status.JobId != nil {
|
||||
result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
|
||||
@ -33,7 +33,7 @@ func (job *ManagedJob) pause() error {
|
||||
if savepointPath != "" {
|
||||
job.def.Status.LastSavepointPath = &savepointPath
|
||||
job.def.Status.PauseSavepointTriggerId = nil
|
||||
job.def.Status.JobStatus = ""
|
||||
job.def.Status.JobStatus = "FINISHED"
|
||||
job.def.Status.LastSavepointPath = &savepointPath
|
||||
lastSavepointDate := time.Now()
|
||||
job.def.Status.LastSavepointDate = &lastSavepointDate
|
||||
|
||||
@ -1,5 +1,11 @@
|
||||
package managed_job
|
||||
|
||||
func (job *ManagedJob) Stop() {
|
||||
job.client.StopJob(*job.def.Status.JobId)
|
||||
import "errors"
|
||||
|
||||
func (job *ManagedJob) Stop() error {
|
||||
if job.def.Status.JobId != nil {
|
||||
return job.client.StopJob(*job.def.Status.JobId)
|
||||
} else {
|
||||
return errors.New("job Id not found")
|
||||
}
|
||||
}
|
||||
|
||||
@ -11,8 +11,8 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// run the job from savepoint and jarId in managedJob
|
||||
func (job *ManagedJob) run(restoreMode bool) error {
|
||||
// Run the job from savepoint and jarId in managedJob
|
||||
func (job *ManagedJob) Run(restoreMode bool) error {
|
||||
var savepointPath string
|
||||
if job.def.Status.LastSavepointPath == nil {
|
||||
pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
|
||||
@ -42,16 +42,25 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
||||
AllowNonRestoredState: true,
|
||||
EntryClass: job.def.Spec.EntryClass,
|
||||
SavepointPath: savepointPath,
|
||||
Parallelism: job.def.Spec.Parallelism,
|
||||
ProgramArg: job.def.Spec.Args,
|
||||
})
|
||||
if err == nil {
|
||||
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
||||
jobId = &runJarResp.JobId
|
||||
break
|
||||
} else {
|
||||
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
||||
if strings.Contains(err.Error(), ".jar does not exist") {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
shouldUpload = true
|
||||
} else {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
stringErr := err.Error()
|
||||
job.def.Status.Error = &stringErr
|
||||
job.def.Status.JobStatus = ""
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return v1alpha1.ErrOnStartingJob
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -66,9 +75,9 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
||||
})
|
||||
return nil
|
||||
}
|
||||
shouldUpload = false
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// job.def.Status.JobId = &runJarResp.JobId
|
||||
|
||||
@ -17,14 +17,14 @@ func (job ManagedJob) createSavepoint() error {
|
||||
pkg.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||
return v1alpha1.ErrNoJobId
|
||||
}
|
||||
pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
||||
pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("name", job.def.GetName()), zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
||||
resp, err := job.client.SavePoints(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
pkg.Logger.Debug("[managed-job] [savepoint] savepoint created successfully", zap.String("trigger-id", resp.RequestID))
|
||||
|
||||
job.def.Status.SavepointTriggerId = &resp.RequestID
|
||||
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
"savepointTriggerId": resp.RequestID,
|
||||
@ -80,3 +80,16 @@ func (job ManagedJob) trackSavepoint() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (job ManagedJob) TriggerSavepoint() error {
|
||||
err := job.createSavepoint()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = job.trackSavepoint()
|
||||
return err
|
||||
}
|
||||
|
||||
func (job ManagedJob) GetLastSavepointPath() *string {
|
||||
return job.def.Status.LastSavepointPath
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ func (job *ManagedJob) upgrade() {
|
||||
"jarId": job.def.Status.JarId,
|
||||
},
|
||||
})
|
||||
err := job.pause()
|
||||
err := job.Pause()
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[managed-job] [upgrade] error in pausing", zap.Error(err))
|
||||
return
|
||||
@ -30,7 +30,7 @@ func (job *ManagedJob) upgrade() {
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
err = job.run(true)
|
||||
err = job.Run(true)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[managed-job] [upgrade] error in running", zap.Error(err))
|
||||
return
|
||||
|
||||
46
internal/rest/base.go
Normal file
46
internal/rest/base.go
Normal file
@ -0,0 +1,46 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
humaFiber "github.com/danielgtaylor/huma/v2/adapters/humafiber"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
app := fiber.New()
|
||||
app.Use(func(c *fiber.Ctx) error {
|
||||
// Logic to execute before the next handler
|
||||
fmt.Printf("Request Method: %s, URL: %s\n", c.Method(), c.OriginalURL())
|
||||
|
||||
// Call the next handler in the stack
|
||||
err := c.Next()
|
||||
|
||||
// Logic to execute after the next handler
|
||||
fmt.Println("Request completed")
|
||||
|
||||
return err
|
||||
})
|
||||
config := huma.DefaultConfig("Go API", "1.0.0")
|
||||
config.Servers = []*huma.Server{{}}
|
||||
config.Components.SecuritySchemes = map[string]*huma.SecurityScheme{
|
||||
"auth": {
|
||||
Type: "http",
|
||||
Scheme: "bearer",
|
||||
BearerFormat: "JWT",
|
||||
},
|
||||
}
|
||||
api := humaFiber.New(app, config)
|
||||
api.UseMiddleware(
|
||||
func(ctx huma.Context, next func(huma.Context)) {
|
||||
ctx = huma.WithValue(ctx, "humaContext", ctx)
|
||||
next(ctx)
|
||||
},
|
||||
)
|
||||
|
||||
initRouter(api)
|
||||
|
||||
log.Fatal(app.Listen(fmt.Sprintf(":%s", "3000")))
|
||||
}
|
||||
253
internal/rest/controller/crd.go
Normal file
253
internal/rest/controller/crd.go
Normal file
@ -0,0 +1,253 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"flink-kube-operator/internal/crd"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"flink-kube-operator/internal/managed_job"
|
||||
"flink-kube-operator/internal/rest/types"
|
||||
"flink-kube-operator/pkg"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
"go.uber.org/zap"
|
||||
k8sTypes "k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
type GetJobsReq struct {
|
||||
}
|
||||
|
||||
type GetJobsResp struct {
|
||||
Body []v1alpha1.FlinkJob
|
||||
}
|
||||
|
||||
func GetJobs(ctx context.Context, req *GetJobsReq) (*GetJobsResp, error) {
|
||||
jobs := []v1alpha1.FlinkJob{}
|
||||
for _, key := range crd.GetAllJobKeys() {
|
||||
job := crd.GetJob(key)
|
||||
job.ManagedFields = nil
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
return &GetJobsResp{Body: jobs}, nil
|
||||
}
|
||||
|
||||
type StopJobReq struct {
|
||||
JobUId string `path:"uid"`
|
||||
}
|
||||
|
||||
type StopJobRespBody struct {
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
|
||||
type StopJobResp struct {
|
||||
Body StopJobRespBody
|
||||
}
|
||||
|
||||
func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.Stop()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.Run(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.Run(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
job.RemoveJar()
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
job.Pause()
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
type JobTriggerSavepointReq struct {
|
||||
JobUId string `path:"uid"`
|
||||
}
|
||||
|
||||
type JobTriggerSavepointRespBody struct {
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
|
||||
type JobTriggerSavepointResp struct {
|
||||
Body JobTriggerSavepointRespBody
|
||||
}
|
||||
|
||||
func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.TriggerSavepoint()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUID))
|
||||
lastSavepointPath := job.GetLastSavepointPath()
|
||||
|
||||
if lastSavepointPath == nil {
|
||||
return nil, huma.Error404NotFound("there is no savepoint path is registered for the job")
|
||||
}
|
||||
folderPath := strings.TrimLeft(*lastSavepointPath, "file:") // Change this to your folder path
|
||||
|
||||
// Create a temporary zip file
|
||||
zipFilePath, err := filepath.Abs("./savepoint.zip")
|
||||
|
||||
pkg.Logger.Debug("[controller] [savepoint]",
|
||||
zap.String("zipFileName", zipFilePath),
|
||||
zap.String("folderPath", folderPath),
|
||||
)
|
||||
|
||||
// Create the zip file
|
||||
zipFile, err := os.Create(zipFilePath)
|
||||
if err != nil {
|
||||
fmt.Println("Error creating zip file:", err)
|
||||
return nil, nil
|
||||
}
|
||||
defer zipFile.Close()
|
||||
|
||||
// Create a new zip writer
|
||||
zipWriter := zip.NewWriter(zipFile)
|
||||
defer zipWriter.Close()
|
||||
|
||||
// Walk through the source directory and add files to the zip
|
||||
err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new file header
|
||||
header, err := zip.FileInfoHeader(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the header name to the relative path
|
||||
header.Name, err = filepath.Rel(folderPath, filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If it's a directory, add a trailing slash
|
||||
if info.IsDir() {
|
||||
header.Name += "/"
|
||||
} else {
|
||||
// Set the compression method
|
||||
header.Method = zip.Deflate
|
||||
}
|
||||
|
||||
// Create a new writer for the file
|
||||
writer, err := zipWriter.CreateHeader(header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If it's a directory, we're done
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open the file to be zipped
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Copy the file content to the zip writer
|
||||
_, err = io.Copy(writer, file)
|
||||
return err
|
||||
})
|
||||
|
||||
// Open the zip file for reading
|
||||
zipFileReader, err := os.Open(zipFilePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open zip file: %w", err)
|
||||
}
|
||||
//defer zipFileReader.Close()
|
||||
|
||||
fileInfo, err := zipFileReader.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get info of zipped file: %w", err)
|
||||
}
|
||||
|
||||
resp := &huma.StreamResponse{
|
||||
Body: func(ctx huma.Context) {
|
||||
ctx.SetHeader("Content-Type", "application/zip")
|
||||
ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
|
||||
ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath))
|
||||
writer := ctx.BodyWriter()
|
||||
br := bufio.NewReader(zipFileReader)
|
||||
for {
|
||||
|
||||
b, err := br.ReadByte()
|
||||
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
fmt.Println(err)
|
||||
break
|
||||
}
|
||||
|
||||
// process the one byte b
|
||||
|
||||
if err != nil {
|
||||
// end of file
|
||||
break
|
||||
}
|
||||
writer.Write([]byte{b})
|
||||
}
|
||||
|
||||
os.Remove(zipFilePath)
|
||||
|
||||
},
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
82
internal/rest/router.go
Normal file
82
internal/rest/router.go
Normal file
@ -0,0 +1,82 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/rest/controller"
|
||||
"net/http"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
)
|
||||
|
||||
func initRouter(api huma.API) {
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "get-jobs",
|
||||
Method: http.MethodGet,
|
||||
Path: "/jobs",
|
||||
Summary: "Get Jobs",
|
||||
Description: "Get Flink Jobs",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.GetJobs)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "stop-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/stop",
|
||||
Summary: "Stop Job",
|
||||
Description: "Stop Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.StopJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "start-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/start",
|
||||
Summary: "Start Job",
|
||||
Description: "Start Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.StartJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "resume-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/resume",
|
||||
Summary: "Resume Job",
|
||||
Description: "Resume Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.ResumeJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "remove-jar",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/remove-jar",
|
||||
Summary: "Remove Job Jar",
|
||||
Description: "Remove Flink Job Jar",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.RemoveJobJar)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "pause-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/pause",
|
||||
Summary: "Pause Job",
|
||||
Description: "Pause Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.PauseJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "download-savepoint",
|
||||
Method: http.MethodGet,
|
||||
Path: "/savepoint/download",
|
||||
Summary: "Download Savepoint",
|
||||
Description: "Download Savepoint",
|
||||
Tags: []string{"Savepoint"},
|
||||
}, controller.DownloadSavepoint)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "trigger-savepoint",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/trigger-savepoint",
|
||||
Summary: "Trigger Savepoint",
|
||||
Description: "Trigger Savepoint",
|
||||
Tags: []string{"Savepoint"},
|
||||
}, controller.TriggerSavepoint)
|
||||
}
|
||||
12
internal/rest/types/savepoint.go
Normal file
12
internal/rest/types/savepoint.go
Normal file
@ -0,0 +1,12 @@
|
||||
package types
|
||||
|
||||
type SavepointDownloadReq struct {
|
||||
JobUID string `query:"jobUID"`
|
||||
}
|
||||
|
||||
type SavepointDownloadResp struct {
|
||||
ContentType string `header:"Content-Type"`
|
||||
ContentDisposition string `header:"Content-Disposition"`
|
||||
ContentLength string `header:"Content-Length"`
|
||||
Body []byte
|
||||
}
|
||||
@ -2,6 +2,8 @@ package pkg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/mattn/go-colorable"
|
||||
"go.uber.org/zap"
|
||||
@ -82,8 +84,10 @@ func OverrideLoggerConfig(config LoggerConfig) {
|
||||
Logger = createOrUpdateInstance(config)
|
||||
}
|
||||
|
||||
var level, err = strconv.Atoi(os.Getenv("LOG_LEVEL"))
|
||||
|
||||
var Logger = GetLogger(context.Background(), LoggerConfig{
|
||||
Level: zap.DebugLevel,
|
||||
Level: zapcore.Level(level),
|
||||
Filename: "./tmp/error.log",
|
||||
MaxSize: 100,
|
||||
MaxAge: 90,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user