Skip to content

Introduce ManagedBy field in RunPolicy #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 10, 2024
11 changes: 11 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7757,6 +7757,17 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to Running.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/mpi-operator' or
'kueue.x-k8s.io/multikueue'.
The mpi-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/mpi-operator', but delegates reconciling the job
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
11 changes: 11 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7734,6 +7734,17 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to Running.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/mpi-operator' or
'kueue.x-k8s.io/multikueue'.
The mpi-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/mpi-operator', but delegates reconciling the job
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/kubeflow/v2beta1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func setDefaultsRunPolicy(policy *RunPolicy) {
if policy.CleanPodPolicy == nil {
policy.CleanPodPolicy = ptr.To(CleanPodPolicyNone)
}
if policy.ManagedBy == nil {
policy.ManagedBy = ptr.To(KubeflowJobController)
}
// The remaining fields are passed as-is to the k8s Job API, which does its
// own defaulting.
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow/v2beta1/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
SlotsPerWorker: ptr.To[int32](1),
RunPolicy: RunPolicy{
CleanPodPolicy: ptr.To(CleanPodPolicyNone),
ManagedBy: ptr.To(KubeflowJobController),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
Expand All @@ -48,6 +49,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
TTLSecondsAfterFinished: ptr.To[int32](2),
ActiveDeadlineSeconds: ptr.To[int64](3),
BackoffLimit: ptr.To[int32](4),
ManagedBy: ptr.To(MultiKueueController),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
Expand All @@ -62,6 +64,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
TTLSecondsAfterFinished: ptr.To[int32](2),
ActiveDeadlineSeconds: ptr.To[int64](3),
BackoffLimit: ptr.To[int32](4),
ManagedBy: ptr.To(MultiKueueController),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationIntel,
Expand All @@ -78,6 +81,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
TTLSecondsAfterFinished: ptr.To[int32](2),
ActiveDeadlineSeconds: ptr.To[int64](3),
BackoffLimit: ptr.To[int32](4),
ManagedBy: ptr.To(KubeflowJobController),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
Expand All @@ -92,6 +96,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
TTLSecondsAfterFinished: ptr.To[int32](2),
ActiveDeadlineSeconds: ptr.To[int64](3),
BackoffLimit: ptr.To[int32](4),
ManagedBy: ptr.To(KubeflowJobController),
},
SSHAuthMountPath: "/home/mpiuser/.ssh",
MPIImplementation: MPIImplementationMPICH,
Expand All @@ -112,6 +117,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
SlotsPerWorker: ptr.To[int32](1),
RunPolicy: RunPolicy{
CleanPodPolicy: ptr.To(CleanPodPolicyNone),
ManagedBy: ptr.To(KubeflowJobController),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
Expand All @@ -138,6 +144,7 @@ func TestSetDefaults_MPIJob(t *testing.T) {
SlotsPerWorker: ptr.To[int32](1),
RunPolicy: RunPolicy{
CleanPodPolicy: ptr.To(CleanPodPolicyNone),
ManagedBy: ptr.To(KubeflowJobController),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@
"description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.",
"type": "string"
},
"managedBy": {
"description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
"type": "string"
},
"schedulingPolicy": {
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/v2beta1.SchedulingPolicy"
Expand Down
18 changes: 18 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ type SchedulingPolicy struct {
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
}

const (
// KubeflowJobController represents the value of the default job controller
KubeflowJobController = "kubeflow.org/mpi-operator"

// MultiKueueController represents the vaue of the MultiKueue controller
MultiKueueController = "kueue.x-k8s.io/multikueue"
)

// RunPolicy encapsulates various runtime policies of the distributed training
// job, for example how to clean up resources and how long the job can stay
// active.
Expand Down Expand Up @@ -131,6 +139,16 @@ type RunPolicy struct {
// Defaults to false.
// +kubebuilder:default:=false
Suspend *bool `json:"suspend,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a job.
// The value must be either an empty, 'kubeflow.org/mpi-operator' or
// 'kueue.x-k8s.io/multikueue'.
// The mpi-operator reconciles a job which doesn't have this
// field at all or the field value is the reserved string
// 'kubeflow.org/mpi-operator', but delegates reconciling the job
// with 'kueue.x-k8s.io/multikueue' to the Kueue.
// The field is immutable.
ManagedBy *string `json:"managedBy,omitempty"`
}

type LauncherCreationPolicy string
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions pkg/apis/kubeflow/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ var (

validRestartPolicies = sets.NewString(
string(kubeflow.RestartPolicyNever),
string(kubeflow.RestartPolicyOnFailure),
)
string(kubeflow.RestartPolicyOnFailure))

validManagedBy = sets.NewString(
string(kubeflow.MultiKueueController),
string(kubeflow.KubeflowJobController))
)

func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList {
Expand Down Expand Up @@ -98,6 +101,11 @@ func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.Error
if policy.BackoffLimit != nil {
errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*policy.BackoffLimit), path.Child("backoffLimit"))...)
}
if policy.ManagedBy != nil {
if !validManagedBy.Has(*policy.ManagedBy) {
errs = append(errs, field.NotSupported(path.Child("managedBy"), *policy.ManagedBy, validManagedBy.List()))
}
}
return errs
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func TestValidateMPIJob(t *testing.T) {
TTLSecondsAfterFinished: ptr.To[int32](-1),
ActiveDeadlineSeconds: ptr.To[int64](-1),
BackoffLimit: ptr.To[int32](-1),
ManagedBy: ptr.To("invalid.com/controller"),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: kubeflow.MPIImplementation("Unknown"),
Expand Down Expand Up @@ -239,6 +240,10 @@ func TestValidateMPIJob(t *testing.T) {
Type: field.ErrorTypeInvalid,
Field: "spec.runPolicy.backoffLimit",
},
{
Type: field.ErrorTypeNotSupported,
Field: "spec.runPolicy.managedBy",
},
{
Type: field.ErrorTypeNotSupported,
Field: "spec.mpiImplementation",
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,11 @@ func (c *MPIJobController) syncHandler(key string) error {
// Set default for the new mpiJob.
scheme.Scheme.Default(mpiJob)

if manager := managedByExternalController(mpiJob.Spec.RunPolicy.ManagedBy); manager != nil {
klog.V(2).Info("Skipping MPIJob managed by a custom controller", "managed-by", manager)
return nil
}

// for mpi job that is terminating, just return.
if mpiJob.DeletionTimestamp != nil {
return nil
Expand Down Expand Up @@ -1722,3 +1727,10 @@ func truncateMessage(message string) string {
suffix := "..."
return message[:eventMessageLimit-len(suffix)] + suffix
}

func managedByExternalController(controllerName *string) *string {
if controllerName != nil && *controllerName != kubeflow.KubeflowJobController {
return controllerName
}
return nil
}
12 changes: 12 additions & 0 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,18 @@ func TestDoNothingWithInvalidMPIJob(t *testing.T) {
f.run(getKey(mpiJob, t))
}

func TestDoNothingWithMPIJobManagedExternally(t *testing.T) {
f := newFixture(t, "")
var replicas int32 = 1
startTime := metav1.Now()
completionTime := metav1.Now()
mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime)
mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationOpenMPI
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
f.setUpMPIJob(mpiJob)
f.run(getKey(mpiJob, t))
}

func TestAllResourcesCreated(t *testing.T) {
impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH}
for _, implementation := range impls {
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1RunPolicy.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/google/go-cmp/cmp"
"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -164,6 +165,27 @@ var _ = ginkgo.Describe("MPIJob", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})

ginkgo.It("should not be updated when managed externaly, only created", func() {
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
ctx := context.Background()
mpiJob = createJob(ctx, mpiJob)

gomega.Consistently(func() error {
updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{})
if err != nil {
return err
}
mpiJob = updatedJob
return nil
}, 5*time.Second, waitInterval).Should(gomega.Succeed())

// job should be created, but status should not be updated neither for create nor for any other status
condition := getJobCondition(mpiJob, kubeflow.JobCreated)
gomega.Expect(condition).To(gomega.BeNil())
condition = getJobCondition(mpiJob, kubeflow.JobSucceeded)
gomega.Expect(condition).To(gomega.BeNil())
})
})

})
Expand Down
Loading
Loading