From 55ad4cb5406e5b65328db3d731e6087b169767a8 Mon Sep 17 00:00:00 2001 From: GonzaloSaez <11050889+GonzaloSaez@users.noreply.github.com> Date: Sun, 6 Apr 2025 11:25:09 +0000 Subject: [PATCH] Fix missing ReplicaIndexLabel when using RunLauncherAsWorker Signed-off-by: GonzaloSaez <11050889+GonzaloSaez@users.noreply.github.com> --- pkg/controller/mpi_job_controller.go | 24 +++++++++++++++++++---- pkg/controller/mpi_job_controller_test.go | 3 ++- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 74fa5bdd..c00a98e3 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -1277,7 +1277,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM // note that pod.spec.dnsConfig also affect the svc resolution // ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/ // launcher can be reach with hostname or service name - if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { + if runLauncherAsWorker(mpiJob) { name := mpiJob.Name + launcherSuffix switch mpiJob.Spec.MPIImplementation { case kubeflow.MPIImplementationOpenMPI: @@ -1325,7 +1325,7 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo buffer.WriteString("#!/bin/sh\n") // We don't check if launcher is running here, launcher should always be there or the job failed - if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { + if runLauncherAsWorker(mpiJob) { name := mpiJob.Name + launcherSuffix buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", name, mpiJob.Name, mpiJob.Namespace)) } @@ -1408,6 +1408,19 @@ func workerName(mpiJob *kubeflow.MPIJob, index int) string { return fmt.Sprintf("%s%s-%d", mpiJob.Name, workerSuffix, index) } +func runLauncherAsWorker(mpiJob *kubeflow.MPIJob) bool { + return ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) +} + +func workerReplicaIndexLabel(mpiJob *kubeflow.MPIJob, index int) string { + // When running the launcher as a worker, some integrations such as Kueue's TAS, require all pods in the PodGroup + // to have a valid and unique index label. That's why we have to pad by one. + if runLauncherAsWorker(mpiJob) { + return strconv.Itoa(index + 1) + } + return strconv.Itoa(index) +} + // newWorker creates a new worker Pod for an MPIJob resource. It also // sets the appropriate OwnerReferences on the resource so handleObject can // discover the MPIJob resource that 'owns' it. @@ -1423,7 +1436,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 for key, value := range defaultLabels(mpiJob.Name, worker) { podTemplate.Labels[key] = value } - podTemplate.Labels[kubeflow.ReplicaIndexLabel] = strconv.Itoa(index) + podTemplate.Labels[kubeflow.ReplicaIndexLabel] = workerReplicaIndexLabel(mpiJob, index) podTemplate.Spec.Hostname = name podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name. if podTemplate.Spec.HostNetwork { @@ -1509,6 +1522,9 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev if c.PodGroupCtrl != nil { c.PodGroupCtrl.decoratePodTemplateSpec(podTemplate, mpiJob.Name) } + if runLauncherAsWorker(mpiJob) { + podTemplate.Labels[kubeflow.ReplicaIndexLabel] = "0" + } podTemplate.Spec.Hostname = launcherName podTemplate.Spec.Subdomain = mpiJob.Name // Matches job' Service name. if podTemplate.Spec.HostNetwork { @@ -1535,7 +1551,7 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev case kubeflow.MPIImplementationMPICH: container.Env = append(container.Env, mpichEnvVars...) } - if !ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { + if !runLauncherAsWorker(mpiJob) { container.Env = append(container.Env, // We overwrite these environment variables so that users will not // be mistakenly using GPU resources for launcher due to potential diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 49585daa..8ad5e4bc 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -1391,6 +1391,7 @@ func TestNewLauncherAndWorker(t *testing.T) { kubeflow.OperatorNameLabel: kubeflow.OperatorName, kubeflow.JobNameLabel: "foo", kubeflow.JobRoleLabel: "launcher", + kubeflow.ReplicaIndexLabel: "0", }, }, Spec: corev1.PodSpec{ @@ -1445,7 +1446,7 @@ func TestNewLauncherAndWorker(t *testing.T) { kubeflow.OperatorNameLabel: kubeflow.OperatorName, kubeflow.JobNameLabel: "foo", kubeflow.JobRoleLabel: "worker", - kubeflow.ReplicaIndexLabel: "0", + kubeflow.ReplicaIndexLabel: "1", }, }, Spec: corev1.PodSpec{