diff --git a/crds.yaml b/crds.yaml index 191d083..c044377 100644 --- a/crds.yaml +++ b/crds.yaml @@ -36,6 +36,10 @@ spec: type: integer jarUri: type: string + args: + type: array + items: + type: string savepointInterval: type: string format: duration diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 550dbdf..930da8c 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -16,6 +16,7 @@ type FlinkJobSpec struct { JarURI string `json:"jarUri"` SavepointInterval metaV1.Duration `json:"savepointInterval"` EntryClass string `json:"entryClass"` + Args []string `json:"args"` } type FlinkJobStatus struct { diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index 4f3553b..9801f5b 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -42,6 +42,8 @@ func (job *ManagedJob) run(restoreMode bool) error { AllowNonRestoredState: true, EntryClass: job.def.Spec.EntryClass, SavepointPath: savepointPath, + Parallelism: job.def.Spec.Parallelism, + ProgramArg: job.def.Spec.Args, }) if err == nil { pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))