Skip to content

Commit 11fa67c

Browse files
andreyvelichjohnugeorge
authored andcommitted
[SDK] Fix Worker and Master templates for PyTorchJob (kubeflow#1988)
1 parent 9761d33 commit 11fa67c

File tree

7 files changed

+76
-59
lines changed

7 files changed

+76
-59
lines changed

.github/workflows/test-python.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ jobs:
2626
run: |
2727
pip install pytest python-dateutil urllib3 kubernetes
2828
pip install -U './sdk/python[huggingface]'
29-
29+
3030
- name: Run unit test for training sdk
31-
run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py
31+
run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py

docs/proposals/train_api_proposal.md

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@
22

33
**<h3>Authors:</h3>**
44

5-
* Deepanker Gupta (**[@deepanker13](https://github.com/deepanker13)**), Nutanix
6-
* Johnu George (**[@johnugeorge](https://github.com/johnugeorge)**), Nutanix
5+
- Deepanker Gupta (**[@deepanker13](https://github.com/deepanker13)**), Nutanix
6+
- Johnu George (**[@johnugeorge](https://github.com/johnugeorge)**), Nutanix
77

88
**<h3>Status</h3>**
99

10-
* 10 Nov 2023 (v1)
11-
* 30 Nov 2023 (v2)
10+
- 10 Nov 2023 (v1)
11+
- 30 Nov 2023 (v2)
1212

1313
**<h3>Goals</h3>**
14+
1415
1. To add a higher level train api for fine-tuning/training LLMs.
1516

1617
**<h3>Non Goals / Limitations</h3>**
18+
1719
1. The dataset is assumed to be preprocessed by the user.
1820

1921
2. Currently only pytorch framework will be supported for running distributed training.
@@ -30,7 +32,7 @@ LLMs are being widely used for generative AI tasks and as their adoption is incr
3032

3133
**<h3>Background</h3>**
3234

33-
Currently, there are two flows for data scientists to start using Training operator for their distributed training needs.
35+
Currently, there are two flows for data scientists to start using Training operator for their distributed training needs.
3436

3537
**<h4>Traditional method</h4>**
3638

@@ -48,7 +50,7 @@ To provide a better user experience, a new higher level SDK was added in[ https:
4850
training_client.create_job(
4951
name=pytorchjob_name,
5052
train_func=train_function,
51-
num_worker_replicas=3, # How many PyTorch Workers will be created.
53+
num_workers=3, # How many PyTorch Workers will be created.
5254
)
5355
```
5456

@@ -67,16 +69,16 @@ dataset_args = datasetProviderClass()
6769
# Arguments related to the trainer code
6870
parameters = {key:value pairs}
6971
trainingClient.train(
70-
num_workers=1,
72+
num_workers=1,
7173
num_procs_per_worker = 1,
72-
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
73-
model_args,
74-
dataset_args,
74+
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
75+
model_args,
76+
dataset_args,
7577
parameters
7678
)
7779
```
7880

79-
Example:
81+
Example:
8082

8183
```python
8284
@dataclass
@@ -98,16 +100,16 @@ class HuggingFaceTrainParams:
98100
transformerClass = field()
99101

100102
trainingClient.train(
101-
num_workers=1,
103+
num_workers=1,
102104
num_procs_per_worker = 1,
103-
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
105+
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
104106
HuggingFaceModelParams(model='hf://openchat/openchat_3.5', access_token = "hf_..." ),
105-
S3DatasetParams(dataset= 's3://doc-example-bucket1/train_dataset', eval_dataset = "s3://doc-example-bucket1/eval_dataset", access_token = "s3 access token", region="us-west-2"),
107+
S3DatasetParams(dataset= 's3://doc-example-bucket1/train_dataset', eval_dataset = "s3://doc-example-bucket1/eval_dataset", access_token = "s3 access token", region="us-west-2"),
106108
HuggingFaceTrainParams(learning_rate=0.1, transformerClass="Trainer", peft_config = {})
107109
)
108110
```
109111

110-
The new proposed API takes following arguments
112+
The new proposed API takes following arguments
111113

112114
1. System parameters - Number of workers, number of resources per workers(GPUs per worker).
113115
2. Model parameters - Model provider and repository details.
@@ -116,7 +118,7 @@ The new proposed API takes following arguments
116118

117119
**<h3>Implementation</h3>**
118120

119-
1. Setup **init** **containers** that download the model and dataset to a PVC. Based on the specified model provider, corresponding training utility functions will be used. Eg: For Huggingface provider, Huggingface trainer can be used. For this **get_pytorchjob_template** function in the sdk needs to be changed to add init containers spec.. Inorder to download models and data sets, we need to support different providers like kaggle, hugging face, s3 or git lfs. The data can be stored in a shared volume between the init container and the main container.<br /> <br /> This way to download models allows using ReadWriteOnce and ReadOnlyMany PVCs. If we adopt the way of creating batch/v1 Job to download models to PVC, we need to force users to prepare ReadWriteOnce and ReadOnlyMany PVCs.<br /> <br /> A new folder containing the code for downloading model and dataset can be added to generate the images for init_containers. Abstract classes will be used as base to create when dataset download, model download and training loop is written for base images. <br /> These parameters will be passed as container args or environment variables.
121+
1. Setup **init** **containers** that download the model and dataset to a PVC. Based on the specified model provider, corresponding training utility functions will be used. Eg: For Huggingface provider, Huggingface trainer can be used. For this **get_pytorchjob_template** function in the sdk needs to be changed to add init containers spec.. Inorder to download models and data sets, we need to support different providers like kaggle, hugging face, s3 or git lfs. The data can be stored in a shared volume between the init container and the main container.<br /> <br /> This way to download models allows using ReadWriteOnce and ReadOnlyMany PVCs. If we adopt the way of creating batch/v1 Job to download models to PVC, we need to force users to prepare ReadWriteOnce and ReadOnlyMany PVCs.<br /> <br /> A new folder containing the code for downloading model and dataset can be added to generate the images for init_containers. Abstract classes will be used as base to create when dataset download, model download and training loop is written for base images. <br /> These parameters will be passed as container args or environment variables.
120122

121123
```
122124
sdk/python
@@ -129,12 +131,13 @@ sdk/python
129131
-> storage.py #this is the file which will be invoked from the dockerfile
130132
-> Dockerfile
131133
```
134+
132135
```python
133136
# code present in abstract_model_provider.py
134137
class modelProvider():
135138
@abstractmethod
136139
def load_config(self):
137-
pass
140+
pass
138141

139142
@abstractmethod
140143
def download_model(self):
@@ -154,9 +157,9 @@ class HuggingFace(modelProvider):
154157
training_client.create_job(name="pytorchjob_name",train_func=custom_training_function, num_of_nodes=1, gpus_per_node = 4)
155158
```
156159

157-
3. We can provide the training function as a **custom_training_function** argument or inside the **base_image** argument of the **create_job** API directly. In case of Hugging Face models, we can use Hugging Face Transformer library’s Trainer class as the training function.
160+
3. We can provide the training function as a **custom_training_function** argument or inside the **base_image** argument of the **create_job** API directly. In case of Hugging Face models, we can use Hugging Face Transformer library’s Trainer class as the training function.
158161

159-
4. The launch command of the training job needs to be changed to torchrun to take **nnodes** and **nproc_per_node** into effect inside **get_pod_template_spec** function in the training operator SDK.
162+
4. The launch command of the training job needs to be changed to torchrun to take **nnodes** and **nproc_per_node** into effect inside **get_pod_template_spec** function in the training operator SDK.
160163

161164
```python
162165
exec_script = textwrap.dedent(
@@ -169,4 +172,4 @@ exec_script = textwrap.dedent(
169172
"torchrun", "$program_path/ephemeral_script.py\
170173
""""
171174
)
172-
```
175+
```

examples/sdk/create-pytorchjob-from-func.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@
364364
"training_client.create_job(\n",
365365
" name=pytorchjob_name,\n",
366366
" train_func=train_pytorch_model,\n",
367-
" num_worker_replicas=3, # How many PyTorch Workers will be created.\n",
367+
" num_workers=3, # How many PyTorch Workers will be created.\n",
368368
")"
369369
]
370370
},

sdk/python/kubeflow/training/api/training_client.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,18 @@ def __init__(
9595

9696
def train(
9797
self,
98-
name: str = None,
99-
namespace: str = None,
98+
name: str,
99+
namespace: Optional[str] = None,
100100
num_workers: int = 1,
101101
num_procs_per_worker: int = 1,
102-
storage_config: Dict[str, str] = {"size": "10Gi", "storage_class": None},
102+
storage_config: Dict[str, Optional[str]] = {
103+
"size": "10Gi",
104+
"storage_class": None,
105+
},
103106
model_provider_parameters=None,
104107
dataset_provider_parameters=None,
105108
train_parameters=None,
106109
resources_per_worker: Union[dict, client.V1ResourceRequirements, None] = None,
107-
# Dict[Literal["gpu", "cpu", "memory"], any] = None,
108110
):
109111
"""
110112
Higher level train api
@@ -116,8 +118,9 @@ def train(
116118
import peft
117119
import transformers
118120
except ImportError:
119-
print(
120-
"train api dependencies not installed. Run pip install -U 'kubeflow-training[huggingface]' "
121+
raise ImportError(
122+
"Train API dependencies not installed. "
123+
+ "Run: pip install -U 'kubeflow-training[huggingface]' "
121124
)
122125
from kubeflow.storage_initializer.s3 import S3DatasetParams
123126
from kubeflow.storage_initializer.hugging_face import (
@@ -274,7 +277,7 @@ def train(
274277
namespace=namespace,
275278
master_pod_template_spec=master_pod_template_spec,
276279
worker_pod_template_spec=worker_pod_template_spec,
277-
num_worker_replicas=num_workers - 1,
280+
num_workers=num_workers,
278281
num_procs_per_worker=num_procs_per_worker,
279282
)
280283

@@ -289,7 +292,7 @@ def create_job(
289292
base_image: Optional[str] = None,
290293
train_func: Optional[Callable] = None,
291294
parameters: Optional[Dict[str, Any]] = None,
292-
num_worker_replicas: Optional[int] = None,
295+
num_workers: Optional[int] = None,
293296
num_chief_replicas: Optional[int] = None,
294297
num_ps_replicas: Optional[int] = None,
295298
packages_to_install: Optional[List[str]] = None,
@@ -320,7 +323,7 @@ def create_job(
320323
argument to define input parameters for the function. If `train_func` is
321324
set, Base Image must support `bash` CLI to execute the training script.
322325
parameters: Dict of input parameters that training function might receive.
323-
num_worker_replicas: Number of Worker replicas for the Job.
326+
num_workers: Number of Worker replicas for the Job.
324327
num_chief_replicas: Number of Chief replicas for the TFJob. Number
325328
of Chief replicas can't be more than 1.
326329
num_ps_replicas: Number of Parameter Server replicas for the TFJob.
@@ -382,20 +385,21 @@ def create_job(
382385
name=name,
383386
namespace=namespace,
384387
pod_template_spec=pod_template_spec,
385-
num_worker_replicas=num_worker_replicas,
388+
num_workers=num_workers,
386389
num_chief_replicas=num_chief_replicas,
387390
num_ps_replicas=num_ps_replicas,
388391
)
389-
elif job_kind == constants.PYTORCHJOB_KIND:
392+
elif job_kind == constants.PYTORCHJOB_KIND and num_workers:
390393
job = utils.get_pytorchjob_template(
391394
name=name,
392395
namespace=namespace,
393396
worker_pod_template_spec=pod_template_spec,
394-
num_worker_replicas=num_worker_replicas,
397+
num_workers=num_workers,
395398
)
396399
else:
397400
raise ValueError(
398-
f"Job kind {job_kind} can't be created using function or image"
401+
f"Job kind {job_kind} can't be created using function or image. "
402+
+ "Number of Workers must be set."
399403
)
400404

401405
# Verify Job object type.
@@ -1052,6 +1056,7 @@ def get_job_logs(
10521056
timeout: Optional, Kubernetes API server timeout in seconds
10531057
to execute the request.
10541058
verbose: Whether to get Kubernetes events for Job and corresponding pods.
1059+
If you need to get events from all PyTorchJob's Pods, set `isMaster = False`.
10551060
10561061
Returns:
10571062
Dict[str, str]: A dictionary in which the keys are pod names and the

sdk/python/kubeflow/training/api/training_client_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def __init__(self, kind) -> None:
168168
"namespace": "test",
169169
"train_func": lambda: print("Test Training Function"),
170170
"base_image": "docker.io/test-training",
171-
"num_worker_replicas": "3",
171+
"num_workers": 3,
172172
"packages_to_install": ["boto3==1.34.14"],
173173
"pip_index_url": "https://pypi.custom.com/simple",
174174
},
@@ -203,5 +203,5 @@ def test_create_job(training_client, test_name, kwargs, expected_output):
203203
training_client.create_job(**kwargs)
204204
assert expected_output == "success"
205205
except Exception as e:
206-
assert type(e) == expected_output
206+
assert type(e) is expected_output
207207
print("test execution complete")

sdk/python/kubeflow/training/utils/utils.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -251,18 +251,14 @@ def get_tfjob_template(
251251
name: str,
252252
namespace: str,
253253
pod_template_spec: models.V1PodTemplateSpec,
254-
num_worker_replicas: Optional[int] = None,
254+
num_workers: Optional[int] = None,
255255
num_chief_replicas: Optional[int] = None,
256256
num_ps_replicas: Optional[int] = None,
257257
):
258258
# Check if at least one replica is set.
259259
# TODO (andreyvelich): Remove this check once we have CEL validation.
260260
# Ref: https://github.com/kubeflow/training-operator/issues/1708
261-
if (
262-
num_worker_replicas is None
263-
and num_chief_replicas is None
264-
and num_ps_replicas is None
265-
):
261+
if num_workers is None and num_chief_replicas is None and num_ps_replicas is None:
266262
raise ValueError("At least one replica for TFJob must be set")
267263

268264
# Create TFJob template.
@@ -293,11 +289,11 @@ def get_tfjob_template(
293289
template=pod_template_spec,
294290
)
295291

296-
if num_worker_replicas is not None:
292+
if num_workers is not None:
297293
tfjob.spec.tf_replica_specs[
298294
constants.REPLICA_TYPE_WORKER
299295
] = models.KubeflowOrgV1ReplicaSpec(
300-
replicas=num_worker_replicas,
296+
replicas=num_workers,
301297
template=pod_template_spec,
302298
)
303299

@@ -307,17 +303,17 @@ def get_tfjob_template(
307303
def get_pytorchjob_template(
308304
name: str,
309305
namespace: str,
310-
master_pod_template_spec: models.V1PodTemplateSpec = None,
311-
worker_pod_template_spec: models.V1PodTemplateSpec = None,
312-
num_worker_replicas: Optional[int] = None,
313-
num_procs_per_worker: Optional[int] = 0,
306+
num_workers: int,
307+
worker_pod_template_spec: Optional[models.V1PodTemplateSpec],
308+
master_pod_template_spec: Optional[models.V1PodTemplateSpec] = None,
309+
num_procs_per_worker: Optional[int] = None,
314310
elastic_policy: Optional[models.KubeflowOrgV1ElasticPolicy] = None,
315311
):
316-
# Check if at least one replica is set.
312+
# Check if at least one Worker is set.
317313
# TODO (andreyvelich): Remove this check once we have CEL validation.
318314
# Ref: https://github.com/kubeflow/training-operator/issues/1708
319-
if num_worker_replicas is None and master_pod_template_spec is None:
320-
raise ValueError("At least one replica for PyTorchJob must be set")
315+
if num_workers is None or num_workers < 0:
316+
raise ValueError("At least one Worker for PyTorchJob must be set")
321317

322318
# Create PyTorchJob template.
323319
pytorchjob = models.KubeflowOrgV1PyTorchJob(
@@ -330,32 +326,45 @@ def get_pytorchjob_template(
330326
),
331327
)
332328

333-
if num_procs_per_worker > 0:
329+
if num_procs_per_worker:
334330
pytorchjob.spec.nproc_per_node = str(num_procs_per_worker)
335331
if elastic_policy:
336332
pytorchjob.spec.elastic_policy = elastic_policy
337333

334+
# Create Master replica if that is set.
338335
if master_pod_template_spec:
339336
pytorchjob.spec.pytorch_replica_specs[
340337
constants.REPLICA_TYPE_MASTER
341338
] = models.KubeflowOrgV1ReplicaSpec(
342339
replicas=1,
343340
template=master_pod_template_spec,
344341
)
342+
# If we don't define Master template, use the Worker template.
343+
else:
344+
pytorchjob.spec.pytorch_replica_specs[
345+
constants.REPLICA_TYPE_MASTER
346+
] = models.KubeflowOrgV1ReplicaSpec(
347+
replicas=1,
348+
template=worker_pod_template_spec,
349+
)
345350

346-
if num_worker_replicas:
351+
# Create Worker with num_workers - 1 replicas.
352+
# TODO (andreyvelich): Investigate if we can run PyTorchJob without the Master
353+
# Currently, if Master is not set, Training Operator controller
354+
# doesn't set RANK and WORLD_SIZE for PyTorchJob
355+
if num_workers > 1:
347356
pytorchjob.spec.pytorch_replica_specs[
348357
constants.REPLICA_TYPE_WORKER
349358
] = models.KubeflowOrgV1ReplicaSpec(
350-
replicas=num_worker_replicas,
359+
replicas=num_workers - 1,
351360
template=worker_pod_template_spec,
352361
)
353362

354363
return pytorchjob
355364

356365

357366
def get_pvc_spec(
358-
pvc_name: str, namespace: str, storage_size: str, storage_class: str = None
367+
pvc_name: str, namespace: str, storage_size: str, storage_class: Optional[str]
359368
):
360369
if pvc_name is None or namespace is None or storage_size is None:
361370
raise ValueError("One of the arguments is None")

sdk/python/test/e2e/test_e2e_pytorchjob.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,13 @@ def train_func():
181181
print(f"Start training for Epoch {i}")
182182
time.sleep(1)
183183

184-
num_workers = 1
184+
num_workers = 3
185185

186186
TRAINING_CLIENT.create_job(
187187
name=JOB_NAME,
188188
namespace=job_namespace,
189189
train_func=train_func,
190-
num_worker_replicas=num_workers,
190+
num_workers=num_workers,
191191
)
192192

193193
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")

0 commit comments

Comments
 (0)