@@ -1277,7 +1277,7 @@ func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigM
1277
1277
// note that pod.spec.dnsConfig also affect the svc resolution
1278
1278
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
1279
1279
// launcher can be reach with hostname or service name
1280
- if ptr . Deref (mpiJob . Spec . RunLauncherAsWorker , false ) {
1280
+ if runLauncherAsWorker (mpiJob ) {
1281
1281
name := mpiJob .Name + launcherSuffix
1282
1282
switch mpiJob .Spec .MPIImplementation {
1283
1283
case kubeflow .MPIImplementationOpenMPI :
@@ -1325,7 +1325,7 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo
1325
1325
buffer .WriteString ("#!/bin/sh\n " )
1326
1326
1327
1327
// We don't check if launcher is running here, launcher should always be there or the job failed
1328
- if ptr . Deref (mpiJob . Spec . RunLauncherAsWorker , false ) {
1328
+ if runLauncherAsWorker (mpiJob ) {
1329
1329
name := mpiJob .Name + launcherSuffix
1330
1330
buffer .WriteString (fmt .Sprintf ("echo %s.%s.%s.svc\n " , name , mpiJob .Name , mpiJob .Namespace ))
1331
1331
}
@@ -1408,6 +1408,19 @@ func workerName(mpiJob *kubeflow.MPIJob, index int) string {
1408
1408
return fmt .Sprintf ("%s%s-%d" , mpiJob .Name , workerSuffix , index )
1409
1409
}
1410
1410
1411
+ func runLauncherAsWorker (mpiJob * kubeflow.MPIJob ) bool {
1412
+ return ptr .Deref (mpiJob .Spec .RunLauncherAsWorker , false )
1413
+ }
1414
+
1415
+ func workerReplicaIndexLabel (mpiJob * kubeflow.MPIJob , index int ) string {
1416
+ // When running the launcher as a worker, some integrations such as Kueue's TAS, require all pods in the PodGroup
1417
+ // to have a valid and unique index label. That's why we have to pad by one.
1418
+ if runLauncherAsWorker (mpiJob ) {
1419
+ return strconv .Itoa (index + 1 )
1420
+ }
1421
+ return strconv .Itoa (index )
1422
+ }
1423
+
1411
1424
// newWorker creates a new worker Pod for an MPIJob resource. It also
1412
1425
// sets the appropriate OwnerReferences on the resource so handleObject can
1413
1426
// discover the MPIJob resource that 'owns' it.
@@ -1423,7 +1436,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
1423
1436
for key , value := range defaultLabels (mpiJob .Name , worker ) {
1424
1437
podTemplate .Labels [key ] = value
1425
1438
}
1426
- podTemplate .Labels [kubeflow .ReplicaIndexLabel ] = strconv . Itoa ( index )
1439
+ podTemplate .Labels [kubeflow .ReplicaIndexLabel ] = workerReplicaIndexLabel ( mpiJob , index )
1427
1440
podTemplate .Spec .Hostname = name
1428
1441
podTemplate .Spec .Subdomain = mpiJob .Name // Matches job' Service name.
1429
1442
if podTemplate .Spec .HostNetwork {
@@ -1509,6 +1522,9 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev
1509
1522
if c .PodGroupCtrl != nil {
1510
1523
c .PodGroupCtrl .decoratePodTemplateSpec (podTemplate , mpiJob .Name )
1511
1524
}
1525
+ if runLauncherAsWorker (mpiJob ) {
1526
+ podTemplate .Labels [kubeflow .ReplicaIndexLabel ] = "0"
1527
+ }
1512
1528
podTemplate .Spec .Hostname = launcherName
1513
1529
podTemplate .Spec .Subdomain = mpiJob .Name // Matches job' Service name.
1514
1530
if podTemplate .Spec .HostNetwork {
@@ -1535,7 +1551,7 @@ func (c *MPIJobController) newLauncherPodTemplate(mpiJob *kubeflow.MPIJob) corev
1535
1551
case kubeflow .MPIImplementationMPICH :
1536
1552
container .Env = append (container .Env , mpichEnvVars ... )
1537
1553
}
1538
- if ! ptr . Deref (mpiJob . Spec . RunLauncherAsWorker , false ) {
1554
+ if ! runLauncherAsWorker (mpiJob ) {
1539
1555
container .Env = append (container .Env ,
1540
1556
// We overwrite these environment variables so that users will not
1541
1557
// be mistakenly using GPU resources for launcher due to potential
0 commit comments