Skip to content

Commit 978f4b9

Browse files
andreyvelichjohnugeorge
authored andcommitted
[SDK] Add resources per worker for Create Job API (kubeflow#1990)
* [SDK] Add resources for create Job API * Fix unbound var * Assign values in get pod template * Add torchrun issue * Test to create PyTorchJob from Image * Fix e2e to create from image * Fix condition * Modify check test conditions
1 parent 11fa67c commit 978f4b9

File tree

6 files changed

+267
-200
lines changed

6 files changed

+267
-200
lines changed

sdk/python/kubeflow/training/api/training_client.py

+77-76
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
from kubeflow.training.constants import constants
2626
from kubeflow.training.utils import utils
2727
from kubeflow.storage_initializer.constants import (
28-
INIT_CONTAINER_MOUNT_PATH,
2928
VOLUME_PATH_DATASET,
3029
VOLUME_PATH_MODEL,
3130
)
3231

32+
3333
logger = logging.getLogger(__name__)
3434

3535
status_logger = utils.StatusLogger(
@@ -139,64 +139,50 @@ def train(
139139

140140
namespace = namespace or self.namespace
141141

142-
if isinstance(resources_per_worker, dict):
143-
if "gpu" in resources_per_worker:
144-
if resources_per_worker["gpu"] is not None and (
145-
num_procs_per_worker > resources_per_worker["gpu"]
146-
):
147-
raise ValueError(
148-
"Insufficient gpu resources allocated to the container."
149-
)
150-
if resources_per_worker["gpu"] is not None:
151-
resources_per_worker["nvidia.com/gpu"] = resources_per_worker.pop(
152-
"gpu"
153-
)
154-
155-
if (
156-
"cpu" not in resources_per_worker
157-
or "memory" not in resources_per_worker
158-
):
159-
raise ValueError("cpu and memory resources not specified")
160-
161-
resources_per_worker = client.V1ResourceRequirements(
162-
requests=resources_per_worker,
163-
limits=resources_per_worker,
164-
)
165-
142+
# TODO (andreyvelich): PVC Creation should be part of Training Operator Controller.
143+
# Ref issue: https://github.com/kubeflow/training-operator/issues/1971
166144
try:
167145
self.core_api.create_namespaced_persistent_volume_claim(
168146
namespace=namespace,
169147
body=utils.get_pvc_spec(
170-
pvc_name=constants.TRAINER_PVC_NAME,
148+
pvc_name=constants.STORAGE_INITIALIZER,
171149
namespace=namespace,
172-
storage_size=storage_config["size"],
173-
storage_class=storage_config["storage_class"],
150+
storage_config=storage_config,
174151
),
175152
)
176153
except Exception as e:
177154
pvc_list = self.core_api.list_namespaced_persistent_volume_claim(namespace)
178155
# Check if the PVC with the specified name exists
179156
for pvc in pvc_list.items:
180-
if pvc.metadata.name == constants.TRAINER_PVC_NAME:
157+
if pvc.metadata.name == constants.STORAGE_INITIALIZER:
181158
print(
182-
f"PVC '{constants.TRAINER_PVC_NAME}' already exists in namespace '{namespace}'."
159+
f"PVC '{constants.STORAGE_INITIALIZER}' already exists in namespace "
160+
f"{namespace}."
183161
)
184162
break
185163
else:
186164
raise RuntimeError("failed to create pvc")
187165

188166
if isinstance(model_provider_parameters, HuggingFaceModelParams):
189167
mp = "hf"
168+
else:
169+
raise ValueError(
170+
f"Invalid model provider parameters {model_provider_parameters}"
171+
)
190172

191173
if isinstance(dataset_provider_parameters, S3DatasetParams):
192174
dp = "s3"
193175
elif isinstance(dataset_provider_parameters, HfDatasetParams):
194176
dp = "hf"
177+
else:
178+
raise ValueError(
179+
f"Invalid dataset provider parameters {dataset_provider_parameters}"
180+
)
195181

196182
# create init container spec
197183
init_container_spec = utils.get_container_spec(
198-
name=constants.STORAGE_CONTAINER,
199-
image=constants.STORAGE_CONTAINER_IMAGE,
184+
name=constants.STORAGE_INITIALIZER,
185+
base_image=constants.STORAGE_INITIALIZER_IMAGE,
200186
args=[
201187
"--model_provider",
202188
mp,
@@ -207,18 +193,13 @@ def train(
207193
"--dataset_provider_parameters",
208194
json.dumps(dataset_provider_parameters.__dict__),
209195
],
210-
volume_mounts=[
211-
models.V1VolumeMount(
212-
name=constants.TRAINER_PV,
213-
mount_path=INIT_CONTAINER_MOUNT_PATH,
214-
)
215-
],
196+
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
216197
)
217198

218199
# create app container spec
219200
container_spec = utils.get_container_spec(
220201
name=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["container"],
221-
image=constants.TRAINER_TRANSFORMER_IMAGE,
202+
base_image=constants.TRAINER_TRANSFORMER_IMAGE,
222203
args=[
223204
"--model_uri",
224205
model_provider_parameters.model_uri,
@@ -235,41 +216,22 @@ def train(
235216
"--training_parameters",
236217
json.dumps(train_parameters.training_parameters.to_dict()),
237218
],
238-
volume_mounts=[
239-
models.V1VolumeMount(
240-
name=constants.TRAINER_PV,
241-
mount_path=INIT_CONTAINER_MOUNT_PATH,
242-
)
243-
],
219+
volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT],
244220
resources=resources_per_worker,
245221
)
246222

247223
# create worker pod spec
248224
worker_pod_template_spec = utils.get_pod_template_spec(
249-
job_kind=constants.PYTORCHJOB_KIND,
250-
containers_spec=[container_spec],
251-
volumes_spec=[
252-
models.V1Volume(
253-
name=constants.TRAINER_PV,
254-
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
255-
claim_name=constants.TRAINER_PVC_NAME
256-
),
257-
)
258-
],
225+
containers=[container_spec],
226+
init_containers=[init_container_spec],
227+
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
259228
)
260229

261230
# create master pod spec
262231
master_pod_template_spec = utils.get_pod_template_spec(
263-
job_kind=constants.PYTORCHJOB_KIND,
264-
containers_spec=[init_container_spec, container_spec],
265-
volumes_spec=[
266-
models.V1Volume(
267-
name=constants.TRAINER_PV,
268-
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
269-
claim_name=constants.TRAINER_PVC_NAME
270-
),
271-
)
272-
],
232+
containers=[container_spec],
233+
init_containers=[init_container_spec],
234+
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
273235
)
274236

275237
job = utils.get_pytorchjob_template(
@@ -293,6 +255,7 @@ def create_job(
293255
train_func: Optional[Callable] = None,
294256
parameters: Optional[Dict[str, Any]] = None,
295257
num_workers: Optional[int] = None,
258+
resources_per_worker: Union[dict, models.V1ResourceRequirements, None] = None,
296259
num_chief_replicas: Optional[int] = None,
297260
num_ps_replicas: Optional[int] = None,
298261
packages_to_install: Optional[List[str]] = None,
@@ -324,6 +287,26 @@ def create_job(
324287
set, Base Image must support `bash` CLI to execute the training script.
325288
parameters: Dict of input parameters that training function might receive.
326289
num_workers: Number of Worker replicas for the Job.
290+
resources_per_worker: A parameter that lets you specify how much
291+
resources each Worker container should have. You can either specify a
292+
kubernetes.client.V1ResourceRequirements object (documented here:
293+
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ResourceRequirements.md)
294+
or a dictionary that includes one or more of the following keys:
295+
`cpu`, `memory`, or `gpu` (other keys will be ignored). Appropriate
296+
values for these keys are documented here:
297+
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.
298+
For example:
299+
```
300+
{
301+
"cpu": "1",
302+
"memory": "2Gi",
303+
"gpu": "1",
304+
}
305+
```
306+
Please note, `gpu` specifies a resource request with a key of
307+
`nvidia.com/gpu`, i.e. an NVIDIA GPU. If you need a different type
308+
of GPU, pass in a V1ResourceRequirement instance instead, since it's
309+
more flexible. This parameter is optional and defaults to None.
327310
num_chief_replicas: Number of Chief replicas for the TFJob. Number
328311
of Chief replicas can't be more than 1.
329312
num_ps_replicas: Number of Parameter Server replicas for the TFJob.
@@ -353,29 +336,40 @@ def create_job(
353336
namespace = namespace or self.namespace
354337
job_kind = job_kind or self.job_kind
355338
if job is not None:
356-
job_kind = job.kind
339+
job_kind = str(job.kind)
357340

358341
if job_kind not in constants.JOB_PARAMETERS:
359342
raise ValueError(
360343
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
361344
)
362345

363346
# If Training function or base image is set, configure Job template.
364-
if train_func is not None or base_image is not None:
347+
if job is None and (train_func is not None or base_image is not None):
365348
# Job name must be set to configure Job template.
366349
if name is None:
367350
raise ValueError(
368351
"Job name must be set to configure Job from function or image"
369352
)
370353

371-
# Get Pod template spec from function or image.
372-
pod_template_spec = utils.get_pod_template_spec(
373-
job_kind=job_kind,
354+
# Assign the default base image.
355+
# TODO (andreyvelich): Add base image for other Job kinds.
356+
if base_image is None:
357+
base_image = constants.JOB_PARAMETERS[job_kind]["base_image"]
358+
359+
# Get Training Container template.
360+
container_spec = utils.get_container_spec(
361+
name=constants.JOB_PARAMETERS[job_kind]["container"],
374362
base_image=base_image,
375363
train_func=train_func,
376-
parameters=parameters,
364+
train_func_parameters=parameters,
377365
packages_to_install=packages_to_install,
378366
pip_index_url=pip_index_url,
367+
resources=resources_per_worker,
368+
)
369+
370+
# Get Pod template spec using the above container.
371+
pod_template_spec = utils.get_pod_template_spec(
372+
containers=[container_spec],
379373
)
380374

381375
# Configure template for different Jobs.
@@ -403,16 +397,21 @@ def create_job(
403397
)
404398

405399
# Verify Job object type.
406-
if not isinstance(job, constants.JOB_MODELS):
407-
raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}")
400+
if not isinstance(
401+
job,
402+
getattr(models, constants.JOB_PARAMETERS[job_kind]["model"]),
403+
):
404+
raise ValueError(
405+
f"Job must be one of these types: {constants.JOB_MODELS}, but Job is: {type(job)}"
406+
)
408407

409408
# Create the Training Job.
410409
try:
411410
self.custom_api.create_namespaced_custom_object(
412411
constants.GROUP,
413412
constants.VERSION,
414413
namespace,
415-
constants.JOB_PARAMETERS[job.kind]["plural"],
414+
constants.JOB_PARAMETERS[job_kind]["plural"],
416415
job,
417416
)
418417
except multiprocessing.TimeoutError:
@@ -580,7 +579,9 @@ def get_job_conditions(
580579
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
581580
)
582581

583-
if job is not None and not isinstance(job, constants.JOB_MODELS):
582+
if job is not None and not isinstance(
583+
job, getattr(models, constants.JOB_PARAMETERS[job_kind]["model"])
584+
):
584585
raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}")
585586

586587
# If Job is not set, get the Training Job.
@@ -1235,7 +1236,7 @@ def delete_job(
12351236
name: str,
12361237
namespace: Optional[str] = None,
12371238
job_kind: Optional[str] = None,
1238-
delete_options: Optional[client.V1DeleteOptions] = None,
1239+
delete_options: Optional[models.V1DeleteOptions] = None,
12391240
):
12401241
"""Delete the Training Job
12411242

sdk/python/kubeflow/training/api/training_client_test.py

+10-9
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,6 @@ def __init__(self, kind) -> None:
123123
},
124124
ValueError,
125125
),
126-
(
127-
"invalid pod template spec parameters",
128-
{
129-
"name": "test job",
130-
"train_func": lambda: "test train function",
131-
"job_kind": constants.MXJOB_KIND,
132-
},
133-
KeyError,
134-
),
135126
(
136127
"paddle job can't be created using function",
137128
{
@@ -174,6 +165,16 @@ def __init__(self, kind) -> None:
174165
},
175166
"success",
176167
),
168+
(
169+
"valid flow to create job using image",
170+
{
171+
"name": "test-job",
172+
"namespace": "test",
173+
"base_image": "docker.io/test-training",
174+
"num_workers": 2,
175+
},
176+
"success",
177+
),
177178
]
178179

179180

0 commit comments

Comments
 (0)