Skip to content

Commit c29c37c

Browse files
authored
Introduce ManagedBy field in RunPolicy (#650)
* Introduce ManageBy field to RunPolicy Signed-off-by: Michal Szadkowski <[email protected]> * Make mpi-operator a default value for ManagedBy Signed-off-by: Michal Szadkowski <[email protected]> * Add validation for ManagedBy field Signed-off-by: Michal Szadkowski <[email protected]> * Make use of ManagedBy in reconciliation process Signed-off-by: Michal Szadkowski <[email protected]> * Regenerate code after adding managedBy field Signed-off-by: Michal Szadkowski <[email protected]> * Add e2e tests Signed-off-by: Michal Szadkowski <[email protected]> * Update after code review Signed-off-by: Michal Szadkowski <[email protected]> * Update tests Signed-off-by: Michal Szadkowski <[email protected]> * Remove default value for ManagedBy Signed-off-by: Michal Szadkowski <[email protected]> * Add optional tag Replace backoff and consistently with sleep Signed-off-by: Michal Szadkowski <[email protected]> * Create common util package for integration and e2e tests with sleep/wait constants Signed-off-by: Michal Szadkowski <[email protected]> --------- Signed-off-by: Michal Szadkowski <[email protected]>
1 parent b77f630 commit c29c37c

File tree

16 files changed

+339
-20
lines changed

16 files changed

+339
-20
lines changed

deploy/v2beta1/mpi-operator.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7757,6 +7757,17 @@ spec:
77577757
CleanPodPolicy defines the policy to kill pods after the job completes.
77587758
Default to Running.
77597759
type: string
7760+
managedBy:
7761+
description: |-
7762+
ManagedBy is used to indicate the controller or entity that manages a MPIJob.
7763+
The value must be either an empty, 'kubeflow.org/mpi-operator' or
7764+
'kueue.x-k8s.io/multikueue'.
7765+
The mpi-operator reconciles a MPIJob which doesn't have this
7766+
field at all or the field value is the reserved string
7767+
'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
7768+
with 'kueue.x-k8s.io/multikueue' to the Kueue.
7769+
The field is immutable.
7770+
type: string
77607771
schedulingPolicy:
77617772
description: SchedulingPolicy defines the policy related to scheduling,
77627773
e.g. gang-scheduling

manifests/base/kubeflow.org_mpijobs.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7734,6 +7734,17 @@ spec:
77347734
CleanPodPolicy defines the policy to kill pods after the job completes.
77357735
Default to Running.
77367736
type: string
7737+
managedBy:
7738+
description: |-
7739+
ManagedBy is used to indicate the controller or entity that manages a MPIJob.
7740+
The value must be either an empty, 'kubeflow.org/mpi-operator' or
7741+
'kueue.x-k8s.io/multikueue'.
7742+
The mpi-operator reconciles a MPIJob which doesn't have this
7743+
field at all or the field value is the reserved string
7744+
'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
7745+
with 'kueue.x-k8s.io/multikueue' to the Kueue.
7746+
The field is immutable.
7747+
type: string
77377748
schedulingPolicy:
77387749
description: SchedulingPolicy defines the policy related to scheduling,
77397750
e.g. gang-scheduling

pkg/apis/kubeflow/v2beta1/swagger.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@
241241
"description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.",
242242
"type": "string"
243243
},
244+
"managedBy": {
245+
"description": "ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
246+
"type": "string"
247+
},
244248
"schedulingPolicy": {
245249
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
246250
"$ref": "#/definitions/v2beta1.SchedulingPolicy"

pkg/apis/kubeflow/v2beta1/types.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ type SchedulingPolicy struct {
9393
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
9494
}
9595

96+
const (
97+
// KubeflowJobController represents the value of the default job controller
98+
KubeflowJobController = "kubeflow.org/mpi-operator"
99+
100+
// MultiKueueController represents the vaue of the MultiKueue controller
101+
MultiKueueController = "kueue.x-k8s.io/multikueue"
102+
)
103+
96104
// RunPolicy encapsulates various runtime policies of the distributed training
97105
// job, for example how to clean up resources and how long the job can stay
98106
// active.
@@ -131,6 +139,17 @@ type RunPolicy struct {
131139
// Defaults to false.
132140
// +kubebuilder:default:=false
133141
Suspend *bool `json:"suspend,omitempty"`
142+
143+
// ManagedBy is used to indicate the controller or entity that manages a MPIJob.
144+
// The value must be either an empty, 'kubeflow.org/mpi-operator' or
145+
// 'kueue.x-k8s.io/multikueue'.
146+
// The mpi-operator reconciles a MPIJob which doesn't have this
147+
// field at all or the field value is the reserved string
148+
// 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
149+
// with 'kueue.x-k8s.io/multikueue' to the Kueue.
150+
// The field is immutable.
151+
// +optional
152+
ManagedBy *string `json:"managedBy,omitempty"`
134153
}
135154

136155
type LauncherCreationPolicy string

pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/kubeflow/validation/validation.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@ var (
3939

4040
validRestartPolicies = sets.NewString(
4141
string(kubeflow.RestartPolicyNever),
42-
string(kubeflow.RestartPolicyOnFailure),
43-
)
42+
string(kubeflow.RestartPolicyOnFailure))
43+
44+
validManagedBy = sets.NewString(
45+
string(kubeflow.MultiKueueController),
46+
string(kubeflow.KubeflowJobController))
4447
)
4548

4649
func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList {
@@ -98,6 +101,11 @@ func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.Error
98101
if policy.BackoffLimit != nil {
99102
errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*policy.BackoffLimit), path.Child("backoffLimit"))...)
100103
}
104+
if policy.ManagedBy != nil {
105+
if !validManagedBy.Has(*policy.ManagedBy) {
106+
errs = append(errs, field.NotSupported(path.Child("managedBy"), *policy.ManagedBy, validManagedBy.List()))
107+
}
108+
}
101109
return errs
102110
}
103111

pkg/apis/kubeflow/validation/validation_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ func TestValidateMPIJob(t *testing.T) {
193193
TTLSecondsAfterFinished: ptr.To[int32](-1),
194194
ActiveDeadlineSeconds: ptr.To[int64](-1),
195195
BackoffLimit: ptr.To[int32](-1),
196+
ManagedBy: ptr.To("invalid.com/controller"),
196197
},
197198
SSHAuthMountPath: "/root/.ssh",
198199
MPIImplementation: kubeflow.MPIImplementation("Unknown"),
@@ -239,6 +240,10 @@ func TestValidateMPIJob(t *testing.T) {
239240
Type: field.ErrorTypeInvalid,
240241
Field: "spec.runPolicy.backoffLimit",
241242
},
243+
{
244+
Type: field.ErrorTypeNotSupported,
245+
Field: "spec.runPolicy.managedBy",
246+
},
242247
{
243248
Type: field.ErrorTypeNotSupported,
244249
Field: "spec.mpiImplementation",

pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller/mpi_job_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,11 @@ func (c *MPIJobController) syncHandler(key string) error {
582582
// Set default for the new mpiJob.
583583
scheme.Scheme.Default(mpiJob)
584584

585+
if manager := managedByExternalController(mpiJob.Spec.RunPolicy.ManagedBy); manager != nil {
586+
klog.V(2).Info("Skipping MPIJob managed by a custom controller", "managed-by", manager)
587+
return nil
588+
}
589+
585590
// for mpi job that is terminating, just return.
586591
if mpiJob.DeletionTimestamp != nil {
587592
return nil
@@ -1722,3 +1727,10 @@ func truncateMessage(message string) string {
17221727
suffix := "..."
17231728
return message[:eventMessageLimit-len(suffix)] + suffix
17241729
}
1730+
1731+
func managedByExternalController(controllerName *string) *string {
1732+
if controllerName != nil && *controllerName != kubeflow.KubeflowJobController {
1733+
return controllerName
1734+
}
1735+
return nil
1736+
}

pkg/controller/mpi_job_controller_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,11 @@ func (f *fixture) expectCreateSecretAction(d *corev1.Secret) {
418418
f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "secrets"}, d.Namespace, d))
419419
}
420420

421+
func (f *fixture) expectNoKubeActions() bool {
422+
k8sActions := filterInformerActions(f.kubeClient.Actions())
423+
return len(k8sActions) == 0
424+
}
425+
421426
func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) {
422427
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob)
423428
action.Subresource = "status"
@@ -504,6 +509,21 @@ func TestDoNothingWithInvalidMPIJob(t *testing.T) {
504509
f.run(getKey(mpiJob, t))
505510
}
506511

512+
func TestDoNothingWithMPIJobManagedExternally(t *testing.T) {
513+
f := newFixture(t, "")
514+
var replicas int32 = 1
515+
startTime := metav1.Now()
516+
completionTime := metav1.Now()
517+
mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime)
518+
mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationOpenMPI
519+
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
520+
f.setUpMPIJob(mpiJob)
521+
f.run(getKey(mpiJob, t))
522+
if !f.expectNoKubeActions() {
523+
t.Fatalf("Expected no kubeActions (secrets, pods, services etc.)")
524+
}
525+
}
526+
507527
func TestAllResourcesCreated(t *testing.T) {
508528
impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH}
509529
for _, implementation := range impls {

sdk/python/v2beta1/docs/V2beta1RunPolicy.md

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py

Lines changed: 29 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)