feat(managed-job): add args and parallelism param to run flink job method

This commit is contained in:
Mohamad Khani 2024-12-18 11:08:21 +03:30
parent 07b8a36e63
commit 5bc047dbd1
3 changed files with 7 additions and 0 deletions

View File

@ -36,6 +36,10 @@ spec:
type: integer type: integer
jarUri: jarUri:
type: string type: string
args:
type: array
items:
type: string
savepointInterval: savepointInterval:
type: string type: string
format: duration format: duration

View File

@ -16,6 +16,7 @@ type FlinkJobSpec struct {
JarURI string `json:"jarUri"` JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"` SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"` EntryClass string `json:"entryClass"`
Args []string `json:"args"`
} }
type FlinkJobStatus struct { type FlinkJobStatus struct {

View File

@ -42,6 +42,8 @@ func (job *ManagedJob) run(restoreMode bool) error {
AllowNonRestoredState: true, AllowNonRestoredState: true,
EntryClass: job.def.Spec.EntryClass, EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath, SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism,
ProgramArg: job.def.Spec.Args,
}) })
if err == nil { if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp)) pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))