Compare commits
No commits in common. "main" and "feature/kube-crd-control" have entirely different histories.
main
...
feature/ku
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@ -10,7 +10,7 @@
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"env": {
|
||||
"FLINK_API_URL": "flink.bz2:8081",
|
||||
"FLINK_API_URL": "127.0.0.1:8081",
|
||||
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
|
||||
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@ -8,13 +8,12 @@
|
||||
"flink",
|
||||
"gitea",
|
||||
"gonanoid",
|
||||
"huma",
|
||||
"logicamp",
|
||||
"Namespaceable",
|
||||
"nindent",
|
||||
"reactivex",
|
||||
"repsert",
|
||||
"taskmanager",
|
||||
"rxgo",
|
||||
"tolerations"
|
||||
]
|
||||
}
|
||||
@ -1,8 +1,8 @@
|
||||
FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
|
||||
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build
|
||||
|
||||
ARG upx_version=4.2.4
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \
|
||||
curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \
|
||||
cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \
|
||||
chmod +x /usr/local/bin/upx && \
|
||||
@ -20,14 +20,13 @@ COPY . .
|
||||
|
||||
# Build
|
||||
ENV GOCACHE=/root/.cache/go-build
|
||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator \
|
||||
&& upx -q -9 /flink-kube-operator
|
||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator
|
||||
RUN upx -q -5 /flink-kube-operator
|
||||
|
||||
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
|
||||
|
||||
|
||||
COPY --from=build /flink-kube-operator /flink-kube-operator
|
||||
COPY --from=build /etc/ssl/certs /etc/ssl/certs
|
||||
|
||||
EXPOSE 8083
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17
|
||||
FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17
|
||||
|
||||
# Set working directory
|
||||
WORKDIR /opt/flink
|
||||
@ -15,19 +15,8 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh
|
||||
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
||||
|
||||
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
||||
CMD ["bin/start-cluster.sh"]
|
||||
@ -1,6 +0,0 @@
|
||||
Installation:
|
||||
|
||||
```bash
|
||||
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
|
||||
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
|
||||
```
|
||||
@ -2,8 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/crd"
|
||||
"flink-kube-operator/internal/managed_job"
|
||||
"flink-kube-operator/internal/rest"
|
||||
"flink-kube-operator/internal/manager"
|
||||
"flink-kube-operator/pkg"
|
||||
"log"
|
||||
"os"
|
||||
@ -31,7 +30,7 @@ func main() {
|
||||
pkg.Logger.Info("[main]", zap.Any("cluster-config", clusterConfig))
|
||||
|
||||
// init flink job manager
|
||||
managed_job.NewManager(c, crdInstance)
|
||||
manager.NewManager(c, crdInstance)
|
||||
|
||||
// for _, jobDef := range config.Jobs {
|
||||
// managed_job.NewManagedJob(c, db, jobDef)
|
||||
@ -49,8 +48,6 @@ func main() {
|
||||
// // pkg.Logger.Debug("[main]", zap.Any("job", job))
|
||||
// }
|
||||
|
||||
go rest.Init()
|
||||
|
||||
cancelChan := make(chan os.Signal, 1)
|
||||
sig := <-cancelChan
|
||||
log.Printf("Caught signal %v", sig)
|
||||
|
||||
@ -36,14 +36,6 @@ spec:
|
||||
type: integer
|
||||
jarUri:
|
||||
type: string
|
||||
jarURIBasicAuthUsername:
|
||||
type: string
|
||||
jarURIBasicAuthPassword:
|
||||
type: string
|
||||
args:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
savepointInterval:
|
||||
type: string
|
||||
format: duration
|
||||
|
||||
@ -3,14 +3,13 @@ apiVersion: flink.logicamp.tech/v1alpha1
|
||||
kind: FlinkJob
|
||||
metadata:
|
||||
name: my-flink-job
|
||||
namespace: default
|
||||
spec:
|
||||
key: word-count
|
||||
name: "Word Count Example"
|
||||
entryClass: "tech.logicamp.logiline.FacilityEnrichment"
|
||||
parallelism: 1
|
||||
jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar"
|
||||
jarURIBasicAuthUsername: logiline-actrunner
|
||||
jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong
|
||||
entryClass: "org.apache.flink.examples.java.wordcount.WordCount"
|
||||
parallelism: 2
|
||||
jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar"
|
||||
flinkConfiguration:
|
||||
taskmanager.numberOfTaskSlots: "1"
|
||||
parallelism.default: "1"
|
||||
taskmanager.numberOfTaskSlots: "2"
|
||||
parallelism.default: "2"
|
||||
19
go.mod
19
go.mod
@ -3,9 +3,8 @@ module flink-kube-operator
|
||||
go 1.23.2
|
||||
|
||||
require (
|
||||
github.com/danielgtaylor/huma/v2 v2.27.0
|
||||
github.com/gofiber/fiber/v2 v2.52.6
|
||||
github.com/logi-camp/go-flink-client v0.2.0
|
||||
github.com/matoous/go-nanoid/v2 v2.1.0
|
||||
github.com/samber/lo v1.47.0
|
||||
go.uber.org/zap v1.27.0
|
||||
k8s.io/apimachinery v0.31.3
|
||||
@ -13,7 +12,6 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
@ -23,20 +21,16 @@ require (
|
||||
github.com/google/gnostic-models v0.6.9 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.17.11 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.16 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.58.0 // indirect
|
||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
|
||||
google.golang.org/protobuf v1.35.1 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/emirpasic/gods v1.12.0 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
@ -49,12 +43,17 @@ require (
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/reactivex/rxgo/v2 v2.5.0
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/stretchr/testify v1.9.0 // indirect
|
||||
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/net v0.31.0 // indirect
|
||||
golang.org/x/oauth2 v0.21.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/sys v0.27.0 // indirect
|
||||
golang.org/x/term v0.26.0 // indirect
|
||||
golang.org/x/text v0.20.0 // indirect
|
||||
golang.org/x/time v0.6.0 // indirect
|
||||
|
||||
51
go.sum
51
go.sum
@ -1,14 +1,15 @@
|
||||
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||
github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/danielgtaylor/huma/v2 v2.27.0 h1:yxgJ8GqYqKeXw/EnQ4ZNc2NBpmn49AlhxL2+ksSXjUI=
|
||||
github.com/danielgtaylor/huma/v2 v2.27.0/go.mod h1:NbSFXRoOMh3BVmiLJQ9EbUpnPas7D9BeOxF/pZBAGa0=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
|
||||
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
|
||||
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
|
||||
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
|
||||
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
|
||||
@ -27,8 +28,6 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
|
||||
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI=
|
||||
github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
@ -53,8 +52,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
@ -64,17 +62,15 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
|
||||
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
|
||||
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/matoous/go-nanoid/v2 v2.1.0 h1:P64+dmq21hhWdtvZfEAofnvJULaRR1Yib0+PnU669bE=
|
||||
github.com/matoous/go-nanoid/v2 v2.1.0/go.mod h1:KlbGNQ+FhrUNIHUxZdL63t7tl4LaPkZNpUULS8H4uVM=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
|
||||
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
@ -91,9 +87,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/reactivex/rxgo/v2 v2.5.0 h1:FhPgHwX9vKdNQB2gq9EPt+EKk9QrrzoeztGbEEnZam4=
|
||||
github.com/reactivex/rxgo/v2 v2.5.0/go.mod h1:bs4fVZxcb5ZckLIOeIeVH942yunJLWDABWGbrHAW+qU=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
|
||||
@ -103,24 +98,22 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.58.0 h1:GGB2dWxSbEprU9j0iMJHgdKYJVDyjrOwF9RE59PbRuE=
|
||||
github.com/valyala/fasthttp v1.58.0/go.mod h1:SYXvHHaFp7QZHGKSHmoMipInhrI5StHrhDTYVEjK/Kw=
|
||||
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
|
||||
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
|
||||
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 h1:BLNsFR8l/hj/oGjnJXkd4Vi3s4kQD3/3x8HSAE4bzN0=
|
||||
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775/go.mod h1:XUZ4x3oGhWfiOnUvTslnKKs39AWUct3g3yJvXTQSJOQ=
|
||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
|
||||
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
@ -132,8 +125,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o=
|
||||
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@ -145,13 +140,15 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
|
||||
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU=
|
||||
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@ -161,6 +158,8 @@ golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
|
||||
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
|
||||
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
@ -173,6 +172,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
|
||||
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
|
||||
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
|
||||
@ -181,6 +181,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
24
helm/Chart.yaml
Normal file
24
helm/Chart.yaml
Normal file
@ -0,0 +1,24 @@
|
||||
apiVersion: v2
|
||||
name: flink-kube-operator
|
||||
description: A Helm chart for Kubernetes
|
||||
|
||||
# A chart can be either an 'application' or a 'library' chart.
|
||||
#
|
||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
||||
# to be deployed.
|
||||
#
|
||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
||||
type: application
|
||||
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.1.0
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "1.16.0"
|
||||
@ -1,6 +0,0 @@
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3
|
||||
generated: "2025-04-04T14:42:09.771390014+03:30"
|
||||
@ -1,6 +0,0 @@
|
||||
apiVersion: v2
|
||||
name: flink-kube-operator
|
||||
description: Helm chart for flink kube operator
|
||||
type: application
|
||||
version: 1.2.3
|
||||
appVersion: "0.1.1"
|
||||
@ -1,12 +0,0 @@
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-checkpoint-pvc
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteMany
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.checkpoint.size }}
|
||||
{{- end }}
|
||||
@ -1,43 +0,0 @@
|
||||
{{- define "flink.env" -}}
|
||||
- name: JOB_MANAGER_RPC_ADDRESS
|
||||
value: "localhost"
|
||||
- name: NAMESPACE
|
||||
value: {{ .Release.Namespace }}
|
||||
- name: FLINK_PROPERTIES
|
||||
value: |
|
||||
jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager
|
||||
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
|
||||
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
|
||||
taskmanager.data.port: 6125
|
||||
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
||||
parallelism.default: {{ .Values.flink.parallelism.default }}
|
||||
state.backend: {{ .Values.flink.state.backend }}
|
||||
rest.port: 8081
|
||||
rootLogger.level = DEBUG
|
||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||
high-availability.type: kubernetes
|
||||
kubernetes.namespace: {{ .Release.Namespace }}
|
||||
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
|
||||
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
|
||||
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
state.checkpoints.dir: file:///opt/flink/checkpoints/
|
||||
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
|
||||
state.checkpoints.dir: s3://flink/checkpoints/
|
||||
{{- end }}
|
||||
state.backend.rocksdb.localdir: /opt/flink/rocksdb
|
||||
high-availability.storageDir: /opt/flink/ha
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
state.savepoints.dir: file:///opt/flink/savepoints/
|
||||
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
|
||||
state.savepoints.dir: s3://flink/savepoints/
|
||||
{{- end }}
|
||||
state.backend.incremental: {{ .Values.flink.state.incremental }}
|
||||
rest.profiling.enabled: true
|
||||
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
|
||||
s3.endpoint: http://{{ .Release.Name }}-minio:9000
|
||||
s3.path.style.access: true
|
||||
{{- end }}
|
||||
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
|
||||
|
||||
{{- end }}
|
||||
@ -1,10 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-ha-pvc
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.ha.size }} # Use size defined in values.yaml
|
||||
@ -1,102 +0,0 @@
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-job-manager
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
spec:
|
||||
replicas: 1
|
||||
strategy:
|
||||
type: Recreate
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
initContainers:
|
||||
- name: volume-mount-hack
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
runAsUser: 0
|
||||
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"]
|
||||
volumeMounts:
|
||||
- name: flink-ha
|
||||
mountPath: {{ .Values.flink.state.ha.dir }}
|
||||
containers:
|
||||
- name: jobmanager
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
imagePullPolicy: Always
|
||||
args: ["jobmanager"]
|
||||
ports:
|
||||
- containerPort: 6123 # JobManager RPC port
|
||||
name: rpc
|
||||
- containerPort: 6124 # JobManager blob server port
|
||||
name: blob
|
||||
- containerPort: 6125 # JobManager queryable state port
|
||||
name: query
|
||||
- containerPort: 8081 # JobManager Web UI port
|
||||
name: ui
|
||||
env:
|
||||
{{- include "flink.env" . | nindent 12 }}
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
|
||||
- name: S3_ENDPOINT
|
||||
value: "http://minio-service:9000"
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_access_key
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_secret_key
|
||||
{{- end }}
|
||||
volumeMounts:
|
||||
- name: flink-ha
|
||||
mountPath: {{ .Values.flink.state.ha.dir }}
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
mountPath: /opt/flink/checkpoints
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
mountPath: /opt/flink/savepoints
|
||||
{{- end }}
|
||||
volumes:
|
||||
- name: flink-ha
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-ha-pvc
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-savepoint-pvc
|
||||
{{- end }}
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.affinity }}
|
||||
affinity:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.tolerations }}
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
@ -1,28 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-job-manager
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
ports:
|
||||
- name: flink-web-ui
|
||||
port: 8081
|
||||
targetPort: 8081
|
||||
- name: rpc
|
||||
port: 6123
|
||||
targetPort: 6123
|
||||
- name: blob
|
||||
port: 6124
|
||||
targetPort: 6124
|
||||
- name: query
|
||||
port: 6125
|
||||
targetPort: 6125
|
||||
- name: operator
|
||||
port: 3000
|
||||
targetPort: 3000
|
||||
selector:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-job-manager
|
||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||
@ -1,12 +0,0 @@
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-savepoint-pvc
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteMany
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.savepoint.size }}
|
||||
{{- end }}
|
||||
@ -1,80 +0,0 @@
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-task-manager
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: taskmanager
|
||||
spec:
|
||||
serviceName: {{ .Release.Name }}-flink-task-manager
|
||||
replicas: {{ .Values.flink.taskManager.replicas }}
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-task-manager
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-task-manager
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
containers:
|
||||
- name: task-manager
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
imagePullPolicy: Always
|
||||
args: ["taskmanager"]
|
||||
env:
|
||||
{{- include "flink.env" . | nindent 8 }}
|
||||
- name: POD_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.podIP
|
||||
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
|
||||
- name: S3_ENDPOINT
|
||||
value: "http://minio-service:9000"
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_access_key
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-flink-secrets
|
||||
key: minio_secret_key
|
||||
{{- end }}
|
||||
volumeMounts:
|
||||
- name: rocksdb-storage
|
||||
mountPath: /opt/flink/rocksdb
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
mountPath: /opt/flink/checkpoints
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
mountPath: /opt/flink/savepoints
|
||||
{{- end }}
|
||||
resources:
|
||||
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
|
||||
volumes:
|
||||
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
|
||||
- name: flink-checkpoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
|
||||
{{- end }}
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
|
||||
- name: flink-savepoint
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Release.Name }}-flink-savepoint-pvc
|
||||
{{- end }}
|
||||
|
||||
volumeClaimTemplates:
|
||||
- metadata:
|
||||
name: rocksdb-storage
|
||||
spec:
|
||||
accessModes: [ ReadWriteOnce ]
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.taskManager.storage.rocksDb.size }}
|
||||
@ -1,17 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-operator
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: http
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
@ -1,70 +0,0 @@
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink-operator
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
spec:
|
||||
serviceName: {{ .Release.Name }}-flink-operator
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .Release.Name }}-flink-operator
|
||||
component: {{ .Release.Name }}-flink-operator
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
initContainers:
|
||||
- name: wait-for-jobmanager
|
||||
image: curlimages/curl:8.5.0 # Lightweight curl image
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
- |
|
||||
echo "Waiting for Flink JobManager to be ready..."
|
||||
until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do
|
||||
echo "JobManager not ready yet - retrying in 5s..."
|
||||
sleep 5
|
||||
done
|
||||
echo "JobManager is ready!"
|
||||
containers:
|
||||
- name: operator
|
||||
securityContext:
|
||||
{{- toYaml .Values.securityContext | nindent 12 }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: {{ .Values.service.port }}
|
||||
protocol: TCP
|
||||
env:
|
||||
- name: FLINK_API_URL
|
||||
value: {{ .Release.Name }}-flink-job-manager:8081
|
||||
- name: NAMESPACE
|
||||
value: "{{ .Release.Namespace }}"
|
||||
{{- if eq .Values.flink.state.savepoint.storageType "s3" }}
|
||||
- name: SAVEPOINT_PATH
|
||||
value: s3://flink/savepoints/
|
||||
- name: S3_ENDPOINT
|
||||
value: "http://{{ .Release.Name }}-minio:9000"
|
||||
- name: AWS_ACCESS_KEY_ID
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-minio
|
||||
key: root-user
|
||||
- name: AWS_SECRET_ACCESS_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .Release.Name }}-minio
|
||||
key: root-password
|
||||
{{- else }}
|
||||
- name: SAVEPOINT_PATH
|
||||
value: /opt/flink/savepoints/
|
||||
{{- end }}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
240
helm/index.yaml
240
helm/index.yaml
@ -1,240 +0,0 @@
|
||||
apiVersion: v1
|
||||
entries:
|
||||
flink-kube-operator:
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-07-18T18:09:46.27166563+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 597f2c07884bb5411dcc6e1a9cdf7672977858efe30273a46fb6525eb6013091
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.3.tgz
|
||||
version: 1.2.3
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-05-17T14:34:55.317942453+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 422a34dc173ebe29adccd46d7ef94505cc022ff20ccbfb85ac3e6e201cba476c
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.2.tgz
|
||||
version: 1.2.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-05-17T14:01:29.891695937+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.1.tgz
|
||||
version: 1.2.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-05-17T12:47:25.848097207+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.2.0.tgz
|
||||
version: 1.2.0
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-04-15T12:06:59.425538953+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.1.2.tgz
|
||||
version: 1.1.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-04-12T23:13:39.394371646+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.0.1.tgz
|
||||
version: 1.0.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.1
|
||||
created: "2025-04-06T01:52:09.478716316+03:30"
|
||||
dependencies:
|
||||
- name: minio
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 16.0.2
|
||||
description: Helm chart for flink kube operator
|
||||
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-1.0.0.tgz
|
||||
version: 1.0.0
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-04-04T13:50:27.971040367+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 00acef7878bcf372d036fabaac400913097d678087a756102b54a28428197bdf
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.14.tgz
|
||||
version: 0.1.14
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T23:13:19.698003661+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: d104b9242362415a7b920e4e2af975730e208ff73db17b8d2afd11ea8b78b4a2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.13.tgz
|
||||
version: 0.1.13
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T23:13:19.697555829+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: f58802990389ecde00a49a442f6e83a007e281e972d07f2979657d2763fe94ba
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.12.tgz
|
||||
version: 0.1.12
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.491747333+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 0daa98c63b443018c2072a2d7448c972faff2274fb04433c613532b408cd3ab1
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.11.tgz
|
||||
version: 0.1.11
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.490697387+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: e091256eeb8640b61443cbe4781426ef493737ab0ac1145e568426bb2c1ed3ba
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.10.tgz
|
||||
version: 0.1.10
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.495842696+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.9.tgz
|
||||
version: 0.1.9
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.495392608+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.8.tgz
|
||||
version: 0.1.8
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.494948853+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.7.tgz
|
||||
version: 0.1.7
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.49450822+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.6.tgz
|
||||
version: 0.1.6
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.494040193+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.5.tgz
|
||||
version: 0.1.5
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.493584927+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.4.tgz
|
||||
version: 0.1.4
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.493138547+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.3.tgz
|
||||
version: 0.1.3
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.492696005+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.2.tgz
|
||||
version: 0.1.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.490170385+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.1.tgz
|
||||
version: 0.1.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2025-03-04T18:04:35.489734651+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.0.tgz
|
||||
version: 0.1.0
|
||||
generated: "2025-07-18T18:09:46.244672127+03:30"
|
||||
@ -17,6 +17,6 @@
|
||||
{{- else if contains "ClusterIP" .Values.service.type }}
|
||||
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
|
||||
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
|
||||
echo "Visit http://127.0.0.1:8081 to use your application"
|
||||
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT
|
||||
echo "Visit http://127.0.0.1:8080 to use your application"
|
||||
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
|
||||
{{- end }}
|
||||
71
helm/templates/flink/deploy.yaml
Normal file
71
helm/templates/flink/deploy.yaml
Normal file
@ -0,0 +1,71 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
|
||||
containers:
|
||||
- name: flink
|
||||
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
|
||||
imagePullPolicy: Always
|
||||
ports:
|
||||
- containerPort: 8081 # JobManager Web UI port
|
||||
- containerPort: 6121 # TaskManager communication port
|
||||
- containerPort: 6122 # TaskManager communication port
|
||||
env:
|
||||
- name: JOB_MANAGER_RPC_ADDRESS
|
||||
value: "localhost" # JobManager and TaskManager in the same container
|
||||
- name: FLINK_PROPERTIES
|
||||
value: |
|
||||
jobmanager.rpc.address: localhost
|
||||
jobmanager.memory.process.size: 2048m
|
||||
taskmanager.memory.process.size: 2048m
|
||||
taskmanager.data.port: 6125
|
||||
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
||||
parallelism.default: {{ .Values.flink.parallelism.default }}
|
||||
state.backend: {{ .Values.flink.state.backend }}
|
||||
state.savepoints.dir: {{ .Values.flink.state.savepoints.dir }}
|
||||
rest.port: 8081
|
||||
rootLogger.level = DEBUG
|
||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||
web.upload.dir: /opt/flink/data/web-upload
|
||||
state.checkpoints.dir: file:///tmp/flink-checkpoints
|
||||
high-availability.type: kubernetes
|
||||
high-availability.storageDir: file:///opt/flink/ha
|
||||
kubernetes.cluster-id: cluster-one
|
||||
kubernetes.namespace: {{ .Release.Namespace }}
|
||||
volumeMounts:
|
||||
- name: flink-data
|
||||
mountPath: /opt/flink/data
|
||||
subPath: data
|
||||
- name: flink-data
|
||||
mountPath: /opt/flink/web-upload
|
||||
subPath: web-upload
|
||||
- name: flink-savepoints
|
||||
mountPath: /opt/flink/savepoints
|
||||
- name: flink-savepoints
|
||||
mountPath: /opt/flink/ha
|
||||
subPath: ha
|
||||
|
||||
volumes:
|
||||
- name: flink-data
|
||||
emptyDir: {} # Temporary storage for internal data
|
||||
- name: flink-savepoints
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence
|
||||
10
helm/templates/flink/pvc.yaml
Normal file
10
helm/templates/flink/pvc.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.persistence.size }} # Use size defined in values.yaml
|
||||
15
helm/templates/flink/service.yaml
Normal file
15
helm/templates/flink/service.yaml
Normal file
@ -0,0 +1,15 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: flink # Adding the flink prefix to the service name
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
ports:
|
||||
- port: 8081
|
||||
targetPort: 8081
|
||||
selector:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||
61
helm/templates/operator/deployment.yaml
Normal file
61
helm/templates/operator/deployment.yaml
Normal file
@ -0,0 +1,61 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ include "flink-kube-operator.fullname" . }}
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 4 }}
|
||||
spec:
|
||||
{{- if not .Values.autoscaling.enabled }}
|
||||
replicas: {{ .Values.replicaCount }}
|
||||
{{- end }}
|
||||
selector:
|
||||
matchLabels:
|
||||
{{- include "flink-kube-operator.selectorLabels" . | nindent 6 }}
|
||||
template:
|
||||
metadata:
|
||||
{{- with .Values.podAnnotations }}
|
||||
annotations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 8 }}
|
||||
{{- with .Values.podLabels }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
spec:
|
||||
{{- with .Values.imagePullSecrets }}
|
||||
imagePullSecrets:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
securityContext:
|
||||
{{- toYaml .Values.podSecurityContext | nindent 8 }}
|
||||
containers:
|
||||
- name: {{ .Chart.Name }}
|
||||
securityContext:
|
||||
{{- toYaml .Values.securityContext | nindent 12 }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: {{ .Values.service.port }}
|
||||
protocol: TCP
|
||||
env:
|
||||
- name: FLINK_API_URL
|
||||
value: {{ .Values.config.flinkApiUrl }}
|
||||
resources:
|
||||
{{- toYaml .Values.resources | nindent 12 }}
|
||||
|
||||
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.affinity }}
|
||||
affinity:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.tolerations }}
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
@ -2,6 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
metadata:
|
||||
name: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
namespace: {{ .Release.Namespace }} # Namespace where the role is created
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 4 }}
|
||||
rules:
|
||||
15
helm/templates/operator/service.yaml
Normal file
15
helm/templates/operator/service.yaml
Normal file
@ -0,0 +1,15 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ include "flink-kube-operator.fullname" . }}
|
||||
labels:
|
||||
{{- include "flink-kube-operator.labels" . | nindent 4 }}
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: http
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
{{- include "flink-kube-operator.selectorLabels" . | nindent 4 }}
|
||||
@ -38,7 +38,8 @@ podAnnotations: {}
|
||||
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
|
||||
podLabels: {}
|
||||
|
||||
podSecurityContext: {} # fsGroup: 2000
|
||||
podSecurityContext: {}
|
||||
# fsGroup: 2000
|
||||
|
||||
securityContext: {}
|
||||
# capabilities:
|
||||
@ -53,7 +54,7 @@ service:
|
||||
# This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
|
||||
type: ClusterIP
|
||||
# This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports
|
||||
port: 3000
|
||||
port: 80
|
||||
|
||||
# This block is for setting up the ingress for more information can be found here: https://kubernetes.io/docs/concepts/services-networking/ingress/
|
||||
ingress:
|
||||
@ -63,10 +64,10 @@ ingress:
|
||||
# kubernetes.io/ingress.class: nginx
|
||||
# kubernetes.io/tls-acme: "true"
|
||||
hosts:
|
||||
- host: chart-example.local
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
- host: chart-example.local
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
tls: []
|
||||
# - secretName: chart-example-tls
|
||||
# hosts:
|
||||
@ -105,6 +106,7 @@ autoscaling:
|
||||
config:
|
||||
flinkApiUrl: flink:8081
|
||||
|
||||
|
||||
nodeSelector: {}
|
||||
|
||||
tolerations: []
|
||||
@ -115,44 +117,22 @@ affinity: {}
|
||||
flink:
|
||||
image:
|
||||
repository: lcr.logicamp.tech/library/flink
|
||||
tag: 1.20.1-scala_2.12-java17-minicluster
|
||||
tag: 1.20.0-scala_2.12-java17-minicluster
|
||||
|
||||
parallelism:
|
||||
default: 1 # Default parallelism for Flink jobs
|
||||
default: 1 # Default parallelism for Flink jobs
|
||||
|
||||
state:
|
||||
backend: rocksdb # Use RocksDB for state backend
|
||||
incremental: true
|
||||
ha:
|
||||
dir: "/opt/flink/ha" # Directory to store ha data
|
||||
pvcName: flink-ha-pvc # PVC for ha
|
||||
size: 10Gi # PVC size for ha
|
||||
checkpoint:
|
||||
storageType: s3 # s3 / filesystem
|
||||
interval: 5min
|
||||
mode: EXACTLY_ONCE
|
||||
size: 8Gi
|
||||
savepoint:
|
||||
storageType: s3
|
||||
size: 8Gi
|
||||
|
||||
jobManager:
|
||||
processMemory: 4096m # Size of job manager process memory
|
||||
|
||||
properties:
|
||||
jobmanager.rpc.timeout: 300s
|
||||
backend: rocksdb # Use RocksDB for state backend
|
||||
savepoints:
|
||||
dir: "file:///opt/flink/savepoints" # Directory to store savepoints
|
||||
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
|
||||
|
||||
taskManager:
|
||||
numberOfTaskSlots: 12 # Number of task slots for task manager
|
||||
processMemory: 4096m # Size of task manager process memory
|
||||
replicas: 1 # Number of task manager replicas
|
||||
storage:
|
||||
rocksDb:
|
||||
size: 4Gi
|
||||
resources:
|
||||
limits:
|
||||
cpu: 3
|
||||
memory: 4Gi
|
||||
requests:
|
||||
cpu: 1
|
||||
memory: 2Gi
|
||||
numberOfTaskSlots: 100 # Number of task slots for TaskManager
|
||||
|
||||
persistence:
|
||||
enabled: true
|
||||
size: 10Gi # PVC size for savepoints storage
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
@ -1,61 +1,14 @@
|
||||
package crd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"github.com/reactivex/rxgo/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
)
|
||||
|
||||
var FinalizerChannel chan (types.UID) = make(chan (types.UID))
|
||||
|
||||
func (crd Crd) manageFinalizer(jobEventChannel chan FlinkJobCrdEvent) {
|
||||
|
||||
finalizerName := "flink-operator.logicamp.tech/finalizer"
|
||||
for jobEvent := range jobEventChannel {
|
||||
pkg.Logger.Debug("[crd] [manage-finalizer] main loop", zap.String("name", jobEvent.Job.Name))
|
||||
go func() {
|
||||
if jobEvent.Job.GetDeletionTimestamp() != nil {
|
||||
// Resource is being deleted
|
||||
if controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
|
||||
// Perform cleanup
|
||||
pkg.Logger.Debug("[finalizer] stopping managed job", zap.String("name", jobEvent.Job.GetName()))
|
||||
if err := crd.cleanupResources(jobEvent.Job); err != nil {
|
||||
pkg.Logger.Info("[crd] [manage-finalizer] cleanup failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// Remove finalizer
|
||||
controllerutil.RemoveFinalizer(jobEvent.Job, finalizerName)
|
||||
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
|
||||
pkg.Logger.Info("[crd] [manage-finalizer] failed to remove finalizer", zap.Error(err))
|
||||
return
|
||||
}
|
||||
pkg.Logger.Debug("[crd] [manage-finalizer] job removed", zap.String("name", jobEvent.Job.GetName()))
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Add finalizer if not present
|
||||
if !controllerutil.ContainsFinalizer(jobEvent.Job, finalizerName) {
|
||||
controllerutil.AddFinalizer(jobEvent.Job, finalizerName)
|
||||
pkg.Logger.Debug("[finalizer] adding job")
|
||||
// Update the resource to add the finalizer
|
||||
if err := crd.runtimeClient.Update(context.Background(), jobEvent.Job); err != nil {
|
||||
pkg.Logger.Info("[finalizer] failed to add", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
|
||||
for j := range jobEventObservable.Observe() {
|
||||
jobEvent := j.V.(*FlinkJobCrdEvent)
|
||||
//pkg.Logger.Debug("[crd] [manage-finalizer] adding finalizer for", zap.String("name", jobEvent.Job.GetName()))
|
||||
controllerutil.AddFinalizer(jobEvent.Job, "")
|
||||
}
|
||||
}
|
||||
|
||||
func (crd Crd) cleanupResources(job *v1alpha1.FlinkJob) error {
|
||||
FinalizerChannel <- job.GetUID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2,59 +2,44 @@ package crd
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"os"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/config"
|
||||
)
|
||||
|
||||
type Crd struct {
|
||||
client dynamic.ResourceInterface
|
||||
runtimeClient client.Client
|
||||
client dynamic.NamespaceableResourceInterface
|
||||
}
|
||||
|
||||
func New() *Crd {
|
||||
// Get Kubernetes config_
|
||||
config_, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
|
||||
// Get Kubernetes config
|
||||
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
|
||||
if err != nil {
|
||||
config_, err = rest.InClusterConfig()
|
||||
config, err = rest.InClusterConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create dynamic client
|
||||
dynamicClient, err := dynamic.NewForConfig(config_)
|
||||
dynamicClient, err := dynamic.NewForConfig(config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
v1alpha1.AddKnownTypes(scheme)
|
||||
// Get FlinkJob resource interface
|
||||
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR).Namespace(os.Getenv("NAMESPACE"))
|
||||
runtimeClient, err := client.New(config.GetConfigOrDie(), client.Options{
|
||||
Scheme: scheme,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
flinkJobClient := dynamicClient.Resource(v1alpha1.FlinkJobGVR)
|
||||
|
||||
crd := Crd{
|
||||
client: flinkJobClient,
|
||||
runtimeClient: runtimeClient,
|
||||
client: flinkJobClient,
|
||||
}
|
||||
|
||||
jobEventCh := make(chan FlinkJobCrdEvent)
|
||||
|
||||
// add finalizer to new resources
|
||||
go crd.manageFinalizer(jobEventCh)
|
||||
|
||||
// Watch for FlinkJob creation
|
||||
crd.watchFlinkJobs(jobEventCh)
|
||||
jobEventObservable := crd.watchFlinkJobs()
|
||||
|
||||
// add finalizer to new resources
|
||||
go crd.manageFinalizer(jobEventObservable)
|
||||
|
||||
return &crd
|
||||
}
|
||||
|
||||
@ -14,7 +14,6 @@ import (
|
||||
|
||||
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||
job := GetJob(jobUid)
|
||||
// pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
|
||||
|
||||
patchBytes, err := json.Marshal(patchData)
|
||||
if err != nil {
|
||||
@ -23,6 +22,7 @@ func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error
|
||||
|
||||
// Patch the status sub-resource
|
||||
unstructuredJob, err := crd.client.
|
||||
Namespace(job.GetNamespace()).
|
||||
Patch(
|
||||
context.Background(),
|
||||
job.GetName(),
|
||||
|
||||
@ -14,10 +14,6 @@ func (crd *Crd) repsert(job *v1alpha1.FlinkJob) {
|
||||
jobs.Store(job.GetUID(), job)
|
||||
}
|
||||
|
||||
func (crd *Crd) remove(uid types.UID) {
|
||||
jobs.Delete(uid)
|
||||
}
|
||||
|
||||
func GetJob(uid types.UID) v1alpha1.FlinkJob {
|
||||
job, _ := jobs.Load(uid)
|
||||
return *job.DeepCopy()
|
||||
|
||||
@ -10,15 +10,12 @@ import (
|
||||
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
|
||||
|
||||
type FlinkJobSpec struct {
|
||||
Key string `json:"key"`
|
||||
Name string `json:"name"`
|
||||
Parallelism int `json:"parallelism"`
|
||||
JarURI string `json:"jarUri"`
|
||||
JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
|
||||
JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
|
||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||
EntryClass string `json:"entryClass"`
|
||||
Args []string `json:"args"`
|
||||
Key string `json:"key"`
|
||||
Name string `json:"name"`
|
||||
Parallelism int `json:"parallelism"`
|
||||
JarURI string `json:"jarUri"`
|
||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||
EntryClass string `json:"entryClass"`
|
||||
}
|
||||
|
||||
type FlinkJobStatus struct {
|
||||
@ -57,7 +54,6 @@ var (
|
||||
ErrNoJarId = errors.New("[managed-job] no jar id")
|
||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
@ -20,11 +20,11 @@ var FlinkJobGVR = schema.GroupVersionResource{
|
||||
}
|
||||
|
||||
var (
|
||||
SchemeBuilder = runtime.NewSchemeBuilder(AddKnownTypes)
|
||||
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
|
||||
AddToScheme = SchemeBuilder.AddToScheme
|
||||
)
|
||||
|
||||
func AddKnownTypes(scheme *runtime.Scheme) error {
|
||||
func addKnownTypes(scheme *runtime.Scheme) error {
|
||||
scheme.AddKnownTypes(SchemeGroupVersion,
|
||||
&FlinkJob{},
|
||||
&FlinkJobList{},
|
||||
|
||||
@ -3,10 +3,10 @@ package crd
|
||||
import (
|
||||
"context"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"os"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
"github.com/reactivex/rxgo/v2"
|
||||
"go.uber.org/zap"
|
||||
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@ -14,57 +14,52 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
func (crd Crd) watchFlinkJobs(ch chan FlinkJobCrdEvent) {
|
||||
func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
||||
|
||||
ch := make(chan rxgo.Item)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
pkg.Logger.Debug("[crd] starting watch")
|
||||
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
namespace := os.Getenv("NAMESPACE")
|
||||
pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace))
|
||||
for event := range watcher.ResultChan() {
|
||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
job := &v1alpha1.FlinkJob{}
|
||||
|
||||
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if job.Namespace != namespace {
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
ch <- FlinkJobCrdEvent{
|
||||
EventType: event.Type,
|
||||
Job: job,
|
||||
}
|
||||
}()
|
||||
pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type)))
|
||||
switch event.Type {
|
||||
case watch.Bookmark:
|
||||
case watch.Modified:
|
||||
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
crd.repsert(job)
|
||||
case watch.Added:
|
||||
//pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
crd.repsert(job)
|
||||
case watch.Deleted:
|
||||
crd.remove(job.UID)
|
||||
}
|
||||
}
|
||||
defer watcher.Stop()
|
||||
pkg.Logger.Warn("[crd] [watch] Watcher stopped, restarting...")
|
||||
pkg.Logger.Debug("[crd] starting watch")
|
||||
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
for event := range watcher.ResultChan() {
|
||||
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
|
||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("cannot create unstructured map", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
job := &v1alpha1.FlinkJob{}
|
||||
|
||||
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("cannot convert unstructured to structured", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
ch <- rxgo.Item{
|
||||
V: &FlinkJobCrdEvent{
|
||||
EventType: event.Type,
|
||||
Job: job,
|
||||
},
|
||||
}
|
||||
switch event.Type {
|
||||
case watch.Bookmark:
|
||||
case watch.Modified:
|
||||
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
crd.repsert(job)
|
||||
case watch.Added:
|
||||
pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
crd.repsert(job)
|
||||
case watch.Deleted:
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return rxgo.FromChannel(ch)
|
||||
}
|
||||
|
||||
@ -1,34 +1,27 @@
|
||||
package jar
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
api "github.com/logi-camp/go-flink-client"
|
||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type JarFile struct {
|
||||
uri string
|
||||
filePath string
|
||||
basicAuthUsername *string
|
||||
basicAuthPassword *string
|
||||
uri string
|
||||
filePath string
|
||||
}
|
||||
|
||||
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
|
||||
func NewJarFile(URI string) (*JarFile, error) {
|
||||
jarFile := &JarFile{
|
||||
uri: URI,
|
||||
basicAuthUsername: basicAuthUsername,
|
||||
basicAuthPassword: basicAuthPassword,
|
||||
uri: URI,
|
||||
}
|
||||
err := jarFile.Download()
|
||||
if err != nil {
|
||||
@ -53,9 +46,7 @@ func (jarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err er
|
||||
}
|
||||
|
||||
func (jarFile *JarFile) Download() error {
|
||||
randBytes := make([]byte, 16)
|
||||
rand.Read(randBytes)
|
||||
fileName := hex.EncodeToString(randBytes)
|
||||
fileName, _ := gonanoid.New()
|
||||
jarFile.filePath = "/tmp/" + fileName + ".jar"
|
||||
out, err := os.Create(jarFile.filePath)
|
||||
if err != nil {
|
||||
@ -63,45 +54,9 @@ func (jarFile *JarFile) Download() error {
|
||||
}
|
||||
|
||||
defer out.Close()
|
||||
|
||||
var resp *http.Response
|
||||
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
|
||||
|
||||
basicAuth := func(username, password string) string {
|
||||
auth := username + ":" + password
|
||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||
}
|
||||
|
||||
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
|
||||
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
|
||||
return nil
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Jar: &cookiejar.Jar{},
|
||||
CheckRedirect: redirectPolicyFunc,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", jarFile.uri, nil)
|
||||
if err != nil {
|
||||
jarFile.delete()
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
|
||||
resp, err = client.Do(req)
|
||||
} else {
|
||||
resp, err = http.Get(jarFile.uri)
|
||||
}
|
||||
if err != nil {
|
||||
resp, err := http.Get(jarFile.uri)
|
||||
if err != nil || resp.StatusCode > 299 {
|
||||
jarFile.delete()
|
||||
pkg.Logger.Error("error in downloading jar", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
respBody := []byte{}
|
||||
resp.Body.Read(respBody)
|
||||
err = errors.New(string(respBody) + " status:" + resp.Status)
|
||||
pkg.Logger.Error("error in downloading jar", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -10,11 +10,16 @@ import (
|
||||
)
|
||||
|
||||
func (job *ManagedJob) Cycle() {
|
||||
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
|
||||
pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
|
||||
|
||||
// Init job
|
||||
if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
|
||||
job.Run(false)
|
||||
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" {
|
||||
job.run(false)
|
||||
return
|
||||
}
|
||||
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusFinished && job.def.Status.LifeCycleStatus == v1alpha1.LifeCycleStatusGracefullyPaused {
|
||||
job.run(true)
|
||||
return
|
||||
}
|
||||
|
||||
@ -36,12 +41,6 @@ func (job *ManagedJob) Cycle() {
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
||||
return
|
||||
}
|
||||
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return
|
||||
}
|
||||
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
||||
// //job.restore()
|
||||
// return
|
||||
@ -51,5 +50,5 @@ func (job *ManagedJob) Cycle() {
|
||||
// return
|
||||
// }
|
||||
|
||||
pkg.Logger.Warn("[managed-job] [cycle] unhandled job status", zap.String("name", job.def.Name), zap.String("status", string(job.def.Status.JobStatus)), zap.String("namespace", job.def.Namespace))
|
||||
pkg.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus)))
|
||||
}
|
||||
|
||||
@ -10,7 +10,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (job *ManagedJob) Pause() error {
|
||||
func (job *ManagedJob) pause() error {
|
||||
var err error
|
||||
if job.def.Status.JobId != nil {
|
||||
result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
|
||||
@ -33,7 +33,7 @@ func (job *ManagedJob) Pause() error {
|
||||
if savepointPath != "" {
|
||||
job.def.Status.LastSavepointPath = &savepointPath
|
||||
job.def.Status.PauseSavepointTriggerId = nil
|
||||
job.def.Status.JobStatus = "FINISHED"
|
||||
job.def.Status.JobStatus = ""
|
||||
job.def.Status.LastSavepointPath = &savepointPath
|
||||
lastSavepointDate := time.Now()
|
||||
job.def.Status.LastSavepointDate = &lastSavepointDate
|
||||
|
||||
@ -1,11 +1,5 @@
|
||||
package managed_job
|
||||
|
||||
import "errors"
|
||||
|
||||
func (job *ManagedJob) Stop() error {
|
||||
if job.def.Status.JobId != nil {
|
||||
return job.client.StopJob(*job.def.Status.JobId)
|
||||
} else {
|
||||
return errors.New("job Id not found")
|
||||
}
|
||||
func (job *ManagedJob) Stop() {
|
||||
job.client.StopJob(*job.def.Status.JobId)
|
||||
}
|
||||
|
||||
@ -11,8 +11,8 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Run the job from savepoint and jarId in managedJob
|
||||
func (job *ManagedJob) Run(restoreMode bool) error {
|
||||
// run the job from savepoint and jarId in managedJob
|
||||
func (job *ManagedJob) run(restoreMode bool) error {
|
||||
var savepointPath string
|
||||
if job.def.Status.LastSavepointPath == nil {
|
||||
pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
|
||||
@ -42,25 +42,16 @@ func (job *ManagedJob) Run(restoreMode bool) error {
|
||||
AllowNonRestoredState: true,
|
||||
EntryClass: job.def.Spec.EntryClass,
|
||||
SavepointPath: savepointPath,
|
||||
Parallelism: job.def.Spec.Parallelism,
|
||||
ProgramArg: job.def.Spec.Args,
|
||||
})
|
||||
if err == nil {
|
||||
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
||||
jobId = &runJarResp.JobId
|
||||
break
|
||||
} else {
|
||||
if strings.Contains(err.Error(), ".jar does not exist") {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
||||
shouldUpload = true
|
||||
} else {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
stringErr := err.Error()
|
||||
job.def.Status.Error = &stringErr
|
||||
job.def.Status.JobStatus = ""
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return v1alpha1.ErrOnStartingJob
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -75,9 +66,9 @@ func (job *ManagedJob) Run(restoreMode bool) error {
|
||||
})
|
||||
return nil
|
||||
}
|
||||
shouldUpload = false
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// job.def.Status.JobId = &runJarResp.JobId
|
||||
|
||||
@ -17,14 +17,14 @@ func (job ManagedJob) createSavepoint() error {
|
||||
pkg.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||
return v1alpha1.ErrNoJobId
|
||||
}
|
||||
pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("name", job.def.GetName()), zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
||||
pkg.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
||||
resp, err := job.client.SavePoints(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
pkg.Logger.Debug("[managed-job] [savepoint] savepoint created successfully", zap.String("trigger-id", resp.RequestID))
|
||||
job.def.Status.SavepointTriggerId = &resp.RequestID
|
||||
|
||||
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
"savepointTriggerId": resp.RequestID,
|
||||
@ -80,16 +80,3 @@ func (job ManagedJob) trackSavepoint() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (job ManagedJob) TriggerSavepoint() error {
|
||||
err := job.createSavepoint()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = job.trackSavepoint()
|
||||
return err
|
||||
}
|
||||
|
||||
func (job ManagedJob) GetLastSavepointPath() *string {
|
||||
return job.def.Status.LastSavepointPath
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ func (job *ManagedJob) upgrade() {
|
||||
"jarId": job.def.Status.JarId,
|
||||
},
|
||||
})
|
||||
err := job.Pause()
|
||||
err := job.pause()
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[managed-job] [upgrade] error in pausing", zap.Error(err))
|
||||
return
|
||||
@ -30,7 +30,7 @@ func (job *ManagedJob) upgrade() {
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
err = job.Run(true)
|
||||
err = job.run(true)
|
||||
if err != nil {
|
||||
pkg.Logger.Error("[managed-job] [upgrade] error in running", zap.Error(err))
|
||||
return
|
||||
|
||||
@ -2,6 +2,7 @@ package managed_job
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/jar"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -9,7 +10,7 @@ import (
|
||||
|
||||
// upload jar file and set the jarId for later usages
|
||||
func (job *ManagedJob) upload() error {
|
||||
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
|
||||
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
|
||||
if err != nil {
|
||||
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
|
||||
return err
|
||||
@ -29,16 +30,3 @@ func (job *ManagedJob) upload() error {
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (job *ManagedJob) RemoveJar() {
|
||||
if job.def.Status.JarId != nil {
|
||||
err := job.client.DeleteJar(*job.def.Status.JarId)
|
||||
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
|
||||
err = job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
"jarId": nil,
|
||||
},
|
||||
})
|
||||
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
|
||||
}
|
||||
}
|
||||
@ -1,8 +1,9 @@
|
||||
package managed_job
|
||||
package manager
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/crd"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"flink-kube-operator/internal/managed_job"
|
||||
"time"
|
||||
|
||||
"flink-kube-operator/pkg"
|
||||
@ -15,22 +16,16 @@ import (
|
||||
|
||||
type Manager struct {
|
||||
client *api.Client
|
||||
managedJobs map[types.UID]ManagedJob
|
||||
managedJobs map[types.UID]managed_job.ManagedJob
|
||||
processingJobsIds []types.UID
|
||||
}
|
||||
|
||||
var mgr Manager
|
||||
|
||||
func GetManager() Manager {
|
||||
return mgr
|
||||
}
|
||||
|
||||
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
quit := make(chan struct{})
|
||||
mgr = Manager{
|
||||
mgr := Manager{
|
||||
client: client,
|
||||
managedJobs: map[types.UID]ManagedJob{},
|
||||
managedJobs: map[types.UID]managed_job.ManagedJob{},
|
||||
processingJobsIds: []types.UID{},
|
||||
}
|
||||
|
||||
@ -46,18 +41,6 @@ func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for event := range crd.FinalizerChannel {
|
||||
manager := mgr.GetJob(event)
|
||||
if manager != nil {
|
||||
err := manager.Stop()
|
||||
pkg.Logger.Info("[finalizer]", zap.Error(err))
|
||||
delete(mgr.managedJobs, event)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return mgr
|
||||
}
|
||||
|
||||
@ -71,7 +54,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
},
|
||||
})
|
||||
}
|
||||
pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobManagerJobOverviews))
|
||||
//pkg.Logger.Debug("[manager] [cycle] overviews", zap.Any("overviews", jobsOverviews))
|
||||
|
||||
// Loop over job definitions as Kubernetes CRD
|
||||
for _, uid := range crd.GetAllJobKeys() {
|
||||
@ -85,26 +68,26 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
mgr.processingJobsIds = append(mgr.processingJobsIds, uid)
|
||||
|
||||
// Check if job exists in manager managed jobs
|
||||
managedJob, jobFound := mgr.managedJobs[uid]
|
||||
if jobFound {
|
||||
managedJob, ok := mgr.managedJobs[uid]
|
||||
if ok {
|
||||
managedJob.Update(def)
|
||||
} else {
|
||||
// Add job to manager managed job
|
||||
managedJob = *NewManagedJob(client, def, crdInstance)
|
||||
managedJob = *managed_job.NewManagedJob(client, def, crdInstance)
|
||||
}
|
||||
if jobManagerJobStatusError != nil {
|
||||
|
||||
}
|
||||
|
||||
jobManagerJobOverview, jobFound := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
|
||||
jobManagerJobOverview, ok := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
|
||||
jobId := managedJob.GetJobId()
|
||||
if jobId != nil {
|
||||
return job.ID == *jobId
|
||||
}
|
||||
return false
|
||||
})
|
||||
if jobFound {
|
||||
// pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
|
||||
if ok {
|
||||
pkg.Logger.Debug("[manager] read status from flink", zap.String("name", jobManagerJobOverview.Name), zap.String("state", jobManagerJobOverview.State))
|
||||
patchStatusObj := map[string]interface{}{
|
||||
"jobStatus": v1alpha1.JobStatus(jobManagerJobOverview.State),
|
||||
}
|
||||
@ -116,16 +99,6 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
crdInstance.Patch(uid, map[string]interface{}{
|
||||
"status": patchStatusObj,
|
||||
})
|
||||
} else {
|
||||
// TODO handle job not found status
|
||||
// patchStatusObj := map[string]interface{}{
|
||||
// "jobStatus": "",
|
||||
// "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
|
||||
// }
|
||||
|
||||
// crdInstance.Patch(uid, map[string]interface{}{
|
||||
// "status": patchStatusObj,
|
||||
// })
|
||||
}
|
||||
|
||||
managedJob.Cycle()
|
||||
@ -136,12 +109,3 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *Manager) GetJobs() map[types.UID]ManagedJob {
|
||||
return mgr.managedJobs
|
||||
}
|
||||
|
||||
func (mgr *Manager) GetJob(id types.UID) *ManagedJob {
|
||||
job := mgr.managedJobs[id]
|
||||
return &job
|
||||
}
|
||||
@ -1,46 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
humaFiber "github.com/danielgtaylor/huma/v2/adapters/humafiber"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
app := fiber.New()
|
||||
app.Use(func(c *fiber.Ctx) error {
|
||||
// Logic to execute before the next handler
|
||||
fmt.Printf("Request Method: %s, URL: %s\n", c.Method(), c.OriginalURL())
|
||||
|
||||
// Call the next handler in the stack
|
||||
err := c.Next()
|
||||
|
||||
// Logic to execute after the next handler
|
||||
fmt.Println("Request completed")
|
||||
|
||||
return err
|
||||
})
|
||||
config := huma.DefaultConfig("Go API", "1.0.0")
|
||||
config.Servers = []*huma.Server{{}}
|
||||
config.Components.SecuritySchemes = map[string]*huma.SecurityScheme{
|
||||
"auth": {
|
||||
Type: "http",
|
||||
Scheme: "bearer",
|
||||
BearerFormat: "JWT",
|
||||
},
|
||||
}
|
||||
api := humaFiber.New(app, config)
|
||||
api.UseMiddleware(
|
||||
func(ctx huma.Context, next func(huma.Context)) {
|
||||
ctx = huma.WithValue(ctx, "humaContext", ctx)
|
||||
next(ctx)
|
||||
},
|
||||
)
|
||||
|
||||
initRouter(api)
|
||||
|
||||
log.Fatal(app.Listen(fmt.Sprintf(":%s", "3000")))
|
||||
}
|
||||
@ -1,253 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"flink-kube-operator/internal/crd"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
"flink-kube-operator/internal/managed_job"
|
||||
"flink-kube-operator/internal/rest/types"
|
||||
"flink-kube-operator/pkg"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
"go.uber.org/zap"
|
||||
k8sTypes "k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
type GetJobsReq struct {
|
||||
}
|
||||
|
||||
type GetJobsResp struct {
|
||||
Body []v1alpha1.FlinkJob
|
||||
}
|
||||
|
||||
func GetJobs(ctx context.Context, req *GetJobsReq) (*GetJobsResp, error) {
|
||||
jobs := []v1alpha1.FlinkJob{}
|
||||
for _, key := range crd.GetAllJobKeys() {
|
||||
job := crd.GetJob(key)
|
||||
job.ManagedFields = nil
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
return &GetJobsResp{Body: jobs}, nil
|
||||
}
|
||||
|
||||
type StopJobReq struct {
|
||||
JobUId string `path:"uid"`
|
||||
}
|
||||
|
||||
type StopJobRespBody struct {
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
|
||||
type StopJobResp struct {
|
||||
Body StopJobRespBody
|
||||
}
|
||||
|
||||
func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.Stop()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.Run(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func ResumeJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.Run(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
job.RemoveJar()
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
job.Pause()
|
||||
return &StopJobResp{Body: StopJobRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
type JobTriggerSavepointReq struct {
|
||||
JobUId string `path:"uid"`
|
||||
}
|
||||
|
||||
type JobTriggerSavepointRespBody struct {
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
|
||||
type JobTriggerSavepointResp struct {
|
||||
Body JobTriggerSavepointRespBody
|
||||
}
|
||||
|
||||
func TriggerSavepoint(ctx context.Context, req *JobTriggerSavepointReq) (*JobTriggerSavepointResp, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUId))
|
||||
err := job.TriggerSavepoint()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &JobTriggerSavepointResp{Body: JobTriggerSavepointRespBody{
|
||||
Success: true,
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func DownloadSavepoint(ctx context.Context, req *types.SavepointDownloadReq) (*huma.StreamResponse, error) {
|
||||
mgr := managed_job.GetManager()
|
||||
job := mgr.GetJob(k8sTypes.UID(req.JobUID))
|
||||
lastSavepointPath := job.GetLastSavepointPath()
|
||||
|
||||
if lastSavepointPath == nil {
|
||||
return nil, huma.Error404NotFound("there is no savepoint path is registered for the job")
|
||||
}
|
||||
folderPath := strings.TrimLeft(*lastSavepointPath, "file:") // Change this to your folder path
|
||||
|
||||
// Create a temporary zip file
|
||||
zipFilePath, err := filepath.Abs("./savepoint.zip")
|
||||
|
||||
pkg.Logger.Debug("[controller] [savepoint]",
|
||||
zap.String("zipFileName", zipFilePath),
|
||||
zap.String("folderPath", folderPath),
|
||||
)
|
||||
|
||||
// Create the zip file
|
||||
zipFile, err := os.Create(zipFilePath)
|
||||
if err != nil {
|
||||
fmt.Println("Error creating zip file:", err)
|
||||
return nil, nil
|
||||
}
|
||||
defer zipFile.Close()
|
||||
|
||||
// Create a new zip writer
|
||||
zipWriter := zip.NewWriter(zipFile)
|
||||
defer zipWriter.Close()
|
||||
|
||||
// Walk through the source directory and add files to the zip
|
||||
err = filepath.Walk(folderPath, func(filePath string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new file header
|
||||
header, err := zip.FileInfoHeader(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the header name to the relative path
|
||||
header.Name, err = filepath.Rel(folderPath, filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If it's a directory, add a trailing slash
|
||||
if info.IsDir() {
|
||||
header.Name += "/"
|
||||
} else {
|
||||
// Set the compression method
|
||||
header.Method = zip.Deflate
|
||||
}
|
||||
|
||||
// Create a new writer for the file
|
||||
writer, err := zipWriter.CreateHeader(header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If it's a directory, we're done
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open the file to be zipped
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Copy the file content to the zip writer
|
||||
_, err = io.Copy(writer, file)
|
||||
return err
|
||||
})
|
||||
|
||||
// Open the zip file for reading
|
||||
zipFileReader, err := os.Open(zipFilePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open zip file: %w", err)
|
||||
}
|
||||
//defer zipFileReader.Close()
|
||||
|
||||
fileInfo, err := zipFileReader.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get info of zipped file: %w", err)
|
||||
}
|
||||
|
||||
resp := &huma.StreamResponse{
|
||||
Body: func(ctx huma.Context) {
|
||||
ctx.SetHeader("Content-Type", "application/zip")
|
||||
ctx.SetHeader("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
|
||||
ctx.SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%s", zipFilePath))
|
||||
writer := ctx.BodyWriter()
|
||||
br := bufio.NewReader(zipFileReader)
|
||||
for {
|
||||
|
||||
b, err := br.ReadByte()
|
||||
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
fmt.Println(err)
|
||||
break
|
||||
}
|
||||
|
||||
// process the one byte b
|
||||
|
||||
if err != nil {
|
||||
// end of file
|
||||
break
|
||||
}
|
||||
writer.Write([]byte{b})
|
||||
}
|
||||
|
||||
os.Remove(zipFilePath)
|
||||
|
||||
},
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
@ -1,82 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/rest/controller"
|
||||
"net/http"
|
||||
|
||||
"github.com/danielgtaylor/huma/v2"
|
||||
)
|
||||
|
||||
func initRouter(api huma.API) {
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "get-jobs",
|
||||
Method: http.MethodGet,
|
||||
Path: "/jobs",
|
||||
Summary: "Get Jobs",
|
||||
Description: "Get Flink Jobs",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.GetJobs)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "stop-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/stop",
|
||||
Summary: "Stop Job",
|
||||
Description: "Stop Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.StopJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "start-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/start",
|
||||
Summary: "Start Job",
|
||||
Description: "Start Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.StartJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "resume-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/resume",
|
||||
Summary: "Resume Job",
|
||||
Description: "Resume Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.ResumeJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "remove-jar",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/remove-jar",
|
||||
Summary: "Remove Job Jar",
|
||||
Description: "Remove Flink Job Jar",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.RemoveJobJar)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "pause-job",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/pause",
|
||||
Summary: "Pause Job",
|
||||
Description: "Pause Flink Job",
|
||||
Tags: []string{"Job"},
|
||||
}, controller.PauseJob)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "download-savepoint",
|
||||
Method: http.MethodGet,
|
||||
Path: "/savepoint/download",
|
||||
Summary: "Download Savepoint",
|
||||
Description: "Download Savepoint",
|
||||
Tags: []string{"Savepoint"},
|
||||
}, controller.DownloadSavepoint)
|
||||
|
||||
huma.Register(api, huma.Operation{
|
||||
OperationID: "trigger-savepoint",
|
||||
Method: http.MethodPost,
|
||||
Path: "/jobs/{uid}/trigger-savepoint",
|
||||
Summary: "Trigger Savepoint",
|
||||
Description: "Trigger Savepoint",
|
||||
Tags: []string{"Savepoint"},
|
||||
}, controller.TriggerSavepoint)
|
||||
}
|
||||
@ -1,12 +0,0 @@
|
||||
package types
|
||||
|
||||
type SavepointDownloadReq struct {
|
||||
JobUID string `query:"jobUID"`
|
||||
}
|
||||
|
||||
type SavepointDownloadResp struct {
|
||||
ContentType string `header:"Content-Type"`
|
||||
ContentDisposition string `header:"Content-Disposition"`
|
||||
ContentLength string `header:"Content-Length"`
|
||||
Body []byte
|
||||
}
|
||||
@ -2,8 +2,6 @@ package pkg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/mattn/go-colorable"
|
||||
"go.uber.org/zap"
|
||||
@ -84,10 +82,8 @@ func OverrideLoggerConfig(config LoggerConfig) {
|
||||
Logger = createOrUpdateInstance(config)
|
||||
}
|
||||
|
||||
var level, err = strconv.Atoi(os.Getenv("LOG_LEVEL"))
|
||||
|
||||
var Logger = GetLogger(context.Background(), LoggerConfig{
|
||||
Level: zapcore.Level(level),
|
||||
Level: zap.DebugLevel,
|
||||
Filename: "./tmp/error.log",
|
||||
MaxSize: 100,
|
||||
MaxAge: 90,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user