From d509aa938907f2ba757a1681a976272ecd097f2b Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Tue, 16 Jan 2024 00:45:35 +0000 Subject: [PATCH 1/8] [SDK] Add resources for create Job API --- .../kubeflow/training/api/training_client.py | 142 ++++++------ .../kubeflow/training/constants/constants.py | 47 ++-- sdk/python/kubeflow/training/utils/utils.py | 208 ++++++++++-------- 3 files changed, 212 insertions(+), 185 deletions(-) diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 6ffd4fb898..487fa0be8f 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -25,11 +25,11 @@ from kubeflow.training.constants import constants from kubeflow.training.utils import utils from kubeflow.storage_initializer.constants import ( - INIT_CONTAINER_MOUNT_PATH, VOLUME_PATH_DATASET, VOLUME_PATH_MODEL, ) + logger = logging.getLogger(__name__) status_logger = utils.StatusLogger( @@ -139,47 +139,23 @@ def train( namespace = namespace or self.namespace - if isinstance(resources_per_worker, dict): - if "gpu" in resources_per_worker: - if resources_per_worker["gpu"] is not None and ( - num_procs_per_worker > resources_per_worker["gpu"] - ): - raise ValueError( - "Insufficient gpu resources allocated to the container." - ) - if resources_per_worker["gpu"] is not None: - resources_per_worker["nvidia.com/gpu"] = resources_per_worker.pop( - "gpu" - ) - - if ( - "cpu" not in resources_per_worker - or "memory" not in resources_per_worker - ): - raise ValueError("cpu and memory resources not specified") - - resources_per_worker = client.V1ResourceRequirements( - requests=resources_per_worker, - limits=resources_per_worker, - ) - try: self.core_api.create_namespaced_persistent_volume_claim( namespace=namespace, body=utils.get_pvc_spec( - pvc_name=constants.TRAINER_PVC_NAME, + pvc_name=constants.STORAGE_INITIALIZER, namespace=namespace, - storage_size=storage_config["size"], - storage_class=storage_config["storage_class"], + storage_config=storage_config, ), ) except Exception as e: pvc_list = self.core_api.list_namespaced_persistent_volume_claim(namespace) # Check if the PVC with the specified name exists for pvc in pvc_list.items: - if pvc.metadata.name == constants.TRAINER_PVC_NAME: + if pvc.metadata.name == constants.STORAGE_INITIALIZER: print( - f"PVC '{constants.TRAINER_PVC_NAME}' already exists in namespace '{namespace}'." + f"PVC '{constants.STORAGE_INITIALIZER}' already exists in namespace " + f"{namespace}." ) break else: @@ -195,8 +171,8 @@ def train( # create init container spec init_container_spec = utils.get_container_spec( - name=constants.STORAGE_CONTAINER, - image=constants.STORAGE_CONTAINER_IMAGE, + name=constants.STORAGE_INITIALIZER, + base_image=constants.STORAGE_INITIALIZER_IMAGE, args=[ "--model_provider", mp, @@ -207,18 +183,13 @@ def train( "--dataset_provider_parameters", json.dumps(dataset_provider_parameters.__dict__), ], - volume_mounts=[ - models.V1VolumeMount( - name=constants.TRAINER_PV, - mount_path=INIT_CONTAINER_MOUNT_PATH, - ) - ], + volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT], ) # create app container spec container_spec = utils.get_container_spec( name=constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["container"], - image=constants.TRAINER_TRANSFORMER_IMAGE, + base_image=constants.TRAINER_TRANSFORMER_IMAGE, args=[ "--model_uri", model_provider_parameters.model_uri, @@ -235,41 +206,22 @@ def train( "--training_parameters", json.dumps(train_parameters.training_parameters.to_dict()), ], - volume_mounts=[ - models.V1VolumeMount( - name=constants.TRAINER_PV, - mount_path=INIT_CONTAINER_MOUNT_PATH, - ) - ], + volume_mounts=[constants.STORAGE_INITIALIZER_VOLUME_MOUNT], resources=resources_per_worker, ) # create worker pod spec worker_pod_template_spec = utils.get_pod_template_spec( - job_kind=constants.PYTORCHJOB_KIND, - containers_spec=[container_spec], - volumes_spec=[ - models.V1Volume( - name=constants.TRAINER_PV, - persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( - claim_name=constants.TRAINER_PVC_NAME - ), - ) - ], + containers=[container_spec], + init_containers=[init_container_spec], + volumes_spec=[constants.STORAGE_INITIALIZER_VOLUME], ) # create master pod spec master_pod_template_spec = utils.get_pod_template_spec( - job_kind=constants.PYTORCHJOB_KIND, - containers_spec=[init_container_spec, container_spec], - volumes_spec=[ - models.V1Volume( - name=constants.TRAINER_PV, - persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( - claim_name=constants.TRAINER_PVC_NAME - ), - ) - ], + containers=[container_spec], + init_containers=[init_container_spec], + volumes_spec=[constants.STORAGE_INITIALIZER_VOLUME], ) job = utils.get_pytorchjob_template( @@ -293,6 +245,7 @@ def create_job( train_func: Optional[Callable] = None, parameters: Optional[Dict[str, Any]] = None, num_workers: Optional[int] = None, + resources_per_worker: Union[dict, models.V1ResourceRequirements, None] = None, num_chief_replicas: Optional[int] = None, num_ps_replicas: Optional[int] = None, packages_to_install: Optional[List[str]] = None, @@ -324,6 +277,26 @@ def create_job( set, Base Image must support `bash` CLI to execute the training script. parameters: Dict of input parameters that training function might receive. num_workers: Number of Worker replicas for the Job. + resources_per_worker: A parameter that lets you specify how much + resources each Worker container should have. You can either specify a + kubernetes.client.V1ResourceRequirements object (documented here: + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ResourceRequirements.md) + or a dictionary that includes one or more of the following keys: + `cpu`, `memory`, or `gpu` (other keys will be ignored). Appropriate + values for these keys are documented here: + https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/. + For example: + ``` + { + "cpu": "1", + "memory": "2Gi", + "gpu": "1", + } + ``` + Please note, `gpu` specifies a resource request with a key of + `nvidia.com/gpu`, i.e. an NVIDIA GPU. If you need a different type + of GPU, pass in a V1ResourceRequirement instance instead, since it's + more flexible. This parameter is optional and defaults to None. num_chief_replicas: Number of Chief replicas for the TFJob. Number of Chief replicas can't be more than 1. num_ps_replicas: Number of Parameter Server replicas for the TFJob. @@ -353,7 +326,7 @@ def create_job( namespace = namespace or self.namespace job_kind = job_kind or self.job_kind if job is not None: - job_kind = job.kind + job_kind = str(job.kind) if job_kind not in constants.JOB_PARAMETERS: raise ValueError( @@ -361,21 +334,31 @@ def create_job( ) # If Training function or base image is set, configure Job template. - if train_func is not None or base_image is not None: + if job is None and (train_func is not None or base_image is not None): # Job name must be set to configure Job template. if name is None: raise ValueError( "Job name must be set to configure Job from function or image" ) - # Get Pod template spec from function or image. - pod_template_spec = utils.get_pod_template_spec( - job_kind=job_kind, + # Assign the default base image. + # TODO (andreyvelich): Add base image for other Job kinds. + if base_image is None: + base_image = constants.JOB_PARAMETERS[job_kind]["base_image"] + + # Get Training Container template. + container_spec = utils.get_container_spec( + name=constants.JOB_PARAMETERS[job_kind]["container"], base_image=base_image, train_func=train_func, - parameters=parameters, + train_func_parameters=parameters, packages_to_install=packages_to_install, pip_index_url=pip_index_url, + resources=resources_per_worker, + ) + # Get Pod template spec from function or image. + pod_template_spec = utils.get_pod_template_spec( + containers=[container_spec], ) # Configure template for different Jobs. @@ -403,8 +386,13 @@ def create_job( ) # Verify Job object type. - if not isinstance(job, constants.JOB_MODELS): - raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}") + if not isinstance( + job, + getattr(models, constants.JOB_PARAMETERS[job_kind]["model"]), + ): + raise ValueError( + f"Job must be one of these types: {constants.JOB_MODELS}, but Job is: {type(job)}" + ) # Create the Training Job. try: @@ -412,7 +400,7 @@ def create_job( constants.GROUP, constants.VERSION, namespace, - constants.JOB_PARAMETERS[job.kind]["plural"], + constants.JOB_PARAMETERS[job_kind]["plural"], job, ) except multiprocessing.TimeoutError: @@ -580,7 +568,9 @@ def get_job_conditions( f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}" ) - if job is not None and not isinstance(job, constants.JOB_MODELS): + if job is not None and not isinstance( + job, getattr(models, constants.JOB_PARAMETERS[job_kind]["model"]) + ): raise ValueError(f"Job must be one of these types: {constants.JOB_MODELS}") # If Job is not set, get the Training Job. @@ -1235,7 +1225,7 @@ def delete_job( name: str, namespace: Optional[str] = None, job_kind: Optional[str] = None, - delete_options: Optional[client.V1DeleteOptions] = None, + delete_options: Optional[models.V1DeleteOptions] = None, ): """Delete the Training Job diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index 83d3703ee8..a2c59fcbc6 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -13,7 +13,8 @@ # limitations under the License. from kubeflow.training import models -from typing import Union +from typing import Union, Dict +from kubeflow.storage_initializer.constants import INIT_CONTAINER_MOUNT_PATH # How long to wait in seconds for requests to the Kubernetes API Server. DEFAULT_TIMEOUT = 120 @@ -68,8 +69,25 @@ REPLICA_TYPE_SERVER = "Server" REPLICA_TYPE_LAUNCHER = "Launcher" +# Constants for Train API. +STORAGE_INITIALIZER = "storage-initializer" +STORAGE_INITIALIZER_IMAGE = "docker.io/kubeflow/storage-initializer" + +STORAGE_INITIALIZER_VOLUME_MOUNT = models.V1VolumeMount( + name=STORAGE_INITIALIZER, + mount_path=INIT_CONTAINER_MOUNT_PATH, +) +STORAGE_INITIALIZER_VOLUME = models.V1Volume( + name=STORAGE_INITIALIZER, + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=STORAGE_INITIALIZER + ), +) +TRAINER_TRANSFORMER_IMAGE = "docker.io/kubeflow/trainer-huggingface" + # TFJob constants. TFJOB_KIND = "TFJob" +TFJOB_MODEL = "KubeflowOrgV1TFJob" TFJOB_PLURAL = "tfjobs" TFJOB_CONTAINER = "tensorflow" TFJOB_REPLICA_TYPES = ( @@ -83,18 +101,15 @@ # PyTorchJob constants PYTORCHJOB_KIND = "PyTorchJob" +PYTORCHJOB_MODEL = "KubeflowOrgV1PyTorchJob" PYTORCHJOB_PLURAL = "pytorchjobs" PYTORCHJOB_CONTAINER = "pytorch" PYTORCHJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower()) -PYTORCHJOB_BASE_IMAGE = "docker.io/pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime" -STORAGE_CONTAINER = "storage-initializer" -STORAGE_CONTAINER_IMAGE = "docker.io/kubeflow/storage-initializer" -TRAINER_TRANSFORMER_IMAGE = "docker.io/kubeflow/trainer-huggingface" -TRAINER_PVC_NAME = "storage-initializer" -TRAINER_PV = "storage-pv" +PYTORCHJOB_BASE_IMAGE = "docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime" # MXJob constants MXJOB_KIND = "MXJob" +MXJOB_MODEL = "KubeflowOrgV1MXJob" MXJOB_PLURAL = "mxjobs" MXJOB_CONTAINER = "mxnet" MXJOB_REPLICA_TYPES = ( @@ -105,18 +120,21 @@ # XGBoostJob constants XGBOOSTJOB_KIND = "XGBoostJob" +XGBOOSTJOB_MODEL = "KubeflowOrgV1XGBoostJob" XGBOOSTJOB_PLURAL = "xgboostjobs" XGBOOSTJOB_CONTAINER = "xgboost" XGBOOSTJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower()) # MPIJob constants MPIJOB_KIND = "MPIJob" +MPIJOB_MODEL = "KubeflowOrgV1MPIJob" MPIJOB_PLURAL = "mpijobs" MPIJOB_CONTAINER = "mpi" MPIJOB_REPLICA_TYPES = (REPLICA_TYPE_LAUNCHER.lower(), REPLICA_TYPE_WORKER.lower()) # PaddleJob constants PADDLEJOB_KIND = "PaddleJob" +PADDLEJOB_MODEL = "KubeflowOrgV1PaddleJob" PADDLEJOB_PLURAL = "paddlejobs" PADDLEJOB_CONTAINER = "paddle" PADDLEJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower()) @@ -129,34 +147,37 @@ # Dictionary to get plural, model, and container for each Job kind. JOB_PARAMETERS = { TFJOB_KIND: { - "model": models.KubeflowOrgV1TFJob, + "model": TFJOB_MODEL, "plural": TFJOB_PLURAL, "container": TFJOB_CONTAINER, "base_image": TFJOB_BASE_IMAGE, }, PYTORCHJOB_KIND: { - "model": models.KubeflowOrgV1PyTorchJob, + "model": PYTORCHJOB_MODEL, "plural": PYTORCHJOB_PLURAL, "container": PYTORCHJOB_CONTAINER, "base_image": PYTORCHJOB_BASE_IMAGE, }, MXJOB_KIND: { - "model": models.KubeflowOrgV1MXJob, + "model": MXJOB_MODEL, "plural": MXJOB_PLURAL, "container": MXJOB_CONTAINER, + "base_image": "TODO", }, XGBOOSTJOB_KIND: { - "model": models.KubeflowOrgV1XGBoostJob, + "model": XGBOOSTJOB_MODEL, "plural": XGBOOSTJOB_PLURAL, "container": XGBOOSTJOB_CONTAINER, + "base_image": "TODO", }, MPIJOB_KIND: { - "model": models.KubeflowOrgV1MPIJob, + "model": MPIJOB_MODEL, "plural": MPIJOB_PLURAL, "container": MPIJOB_CONTAINER, + "base_image": "TODO", }, PADDLEJOB_KIND: { - "model": models.KubeflowOrgV1PaddleJob, + "model": PADDLEJOB_MODEL, "plural": PADDLEJOB_PLURAL, "container": PADDLEJOB_CONTAINER, "base_image": PADDLEJOB_BASE_IMAGE, diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 85c0d69ec1..b03afaa381 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -17,7 +17,7 @@ import logging import textwrap import inspect -from typing import Optional, Callable, List, Dict, Any +from typing import Optional, Callable, List, Dict, Any, Tuple, Union import json import threading import queue @@ -127,122 +127,134 @@ def get_script_for_python_packages( return script_for_python_packages +def get_command_using_train_func( + train_func: Optional[Callable], + train_func_parameters: Optional[Dict[str, Any]] = None, + packages_to_install: Optional[List[str]] = None, + pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, +) -> Tuple[List[str], List[str]]: + """ + Get container args and command using the given training function and parameters. + """ + # Check if function is callable. + if not callable(train_func): + raise ValueError( + f"Training function must be callable, got function type: {type(train_func)}" + ) + + # Extract function implementation. + func_code = inspect.getsource(train_func) + + # Function might be defined in some indented scope (e.g. in another function). + # We need to dedent the function code. + func_code = textwrap.dedent(func_code) + + # Wrap function code to execute it from the file. For example: + # def train(parameters): + # print('Start Training...') + # train({'lr': 0.01}) + if train_func_parameters is None: + func_code = f"{func_code}\n{train_func.__name__}()\n" + else: + func_code = f"{func_code}\n{train_func.__name__}({train_func_parameters})\n" + + # Prepare execute script template. + exec_script = textwrap.dedent( + """ + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM\n + {func_code} + EOM + printf "%s" \"$SCRIPT\" > \"$program_path/ephemeral_script.py\" + python3 -u \"$program_path/ephemeral_script.py\"""" + ) + + # Add function code to the execute script. + exec_script = exec_script.format(func_code=func_code) + + # Install Python packages if that is required. + if packages_to_install is not None: + exec_script = ( + get_script_for_python_packages(packages_to_install, pip_index_url) + + exec_script + ) + + # Return container command and args execution script to the container arguments. + return ["bash", "-c"], [exec_script] + + def get_container_spec( name: str, - image: str, + base_image: str, + train_func: Optional[Callable] = None, + train_func_parameters: Optional[Dict[str, Any]] = None, + packages_to_install: Optional[List[str]] = None, + pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, args: Optional[List[str]] = None, - resources: Optional[models.V1ResourceRequirements] = None, + resources: Union[dict, models.V1ResourceRequirements, None] = None, volume_mounts: Optional[List[models.V1VolumeMount]] = None, ) -> models.V1Container: """ - get container spec for given name and image. + Get container spec for the given parameters. """ - if name is None or image is None: - raise ValueError("container name or image cannot be none") - container_spec = models.V1Container(name=name, image=image) - if args: + if name is None or base_image is None: + raise ValueError("Container name or base image cannot be none") + + # Create initial container spec. + container_spec = models.V1Container(name=name, image=base_image) + + # If Training function is set, convert training function to the container args and command. + if train_func is not None: + container_spec.command, container_spec.args = get_command_using_train_func( + train_func=train_func, + train_func_parameters=train_func_parameters, + packages_to_install=packages_to_install, + pip_index_url=pip_index_url, + ) + # Otherwise, get container args from the input. + else: container_spec.args = args - if resources: - container_spec.resources = resources + # Convert dict to the Kubernetes container resources if that is required. + if isinstance(resources, dict): + # Convert all keys in resources to lowercase. + resources = {k.lower(): v for k, v in resources.items()} + if "gpu" in resources: + resources["nvidia.com/gpu"] = resources.pop("gpu") - if volume_mounts: - container_spec.volume_mounts = volume_mounts + resources = models.V1ResourceRequirements( + requests=resources, + limits=resources, + ) + + # Assign the rest container spec. If the value is None, container doesn't have that spec. + container_spec.resources = resources + container_spec.volume_mounts = volume_mounts return container_spec def get_pod_template_spec( - job_kind: str, - base_image: Optional[str] = None, - train_func: Optional[Callable] = None, - parameters: Optional[Dict[str, Any]] = None, - packages_to_install: Optional[List[str]] = None, - pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, - init_containers_spec: Optional[List[models.V1Container]] = None, - containers_spec: Optional[List[models.V1Container]] = None, + containers: List[models.V1Container], + init_containers: Optional[List[models.V1Container]] = None, volumes_spec: Optional[List[models.V1Volume]] = None, -): +) -> models.V1PodTemplateSpec: """ - Get Pod template spec for the given function and base image. + Get Pod template spec for the given parameters. """ - # Assign the default base image. - # TODO (andreyvelich): Add base image for other Job kinds. - if base_image is None: - base_image = constants.JOB_PARAMETERS[job_kind]["base_image"] - - # Create Pod template spec. + # Create initial Pod template spec. pod_template_spec = models.V1PodTemplateSpec( metadata=models.V1ObjectMeta( annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} ), - spec=models.V1PodSpec( - containers=[ - get_container_spec( - name=constants.JOB_PARAMETERS[job_kind]["container"], - image=base_image, - ) - ] - ), + spec=models.V1PodSpec(containers=[containers]), ) - if containers_spec: - pod_template_spec.spec.containers = containers_spec - if init_containers_spec: - pod_template_spec.spec.init_containers = init_containers_spec - if volumes_spec: - pod_template_spec.spec.volumes = volumes_spec - - # If Training function is set, convert function to container execution script. - if train_func is not None: - # Check if function is callable. - if not callable(train_func): - raise ValueError( - f"Training function must be callable, got function type: {type(train_func)}" - ) - - # Extract function implementation. - func_code = inspect.getsource(train_func) - - # Function might be defined in some indented scope (e.g. in another function). - # We need to dedent the function code. - func_code = textwrap.dedent(func_code) - - # Wrap function code to execute it from the file. For example: - # def train(parameters): - # print('Start Training...') - # train({'lr': 0.01}) - if parameters is None: - func_code = f"{func_code}\n{train_func.__name__}()\n" - else: - func_code = f"{func_code}\n{train_func.__name__}({parameters})\n" - - # Prepare execute script template. - exec_script = textwrap.dedent( - """ - program_path=$(mktemp -d) - read -r -d '' SCRIPT << EOM\n - {func_code} - EOM - printf "%s" \"$SCRIPT\" > \"$program_path/ephemeral_script.py\" - python3 -u \"$program_path/ephemeral_script.py\"""" - ) - - # Add function code to the execute script. - exec_script = exec_script.format(func_code=func_code) - - # Install Python packages if that is required. - if packages_to_install is not None: - exec_script = ( - get_script_for_python_packages(packages_to_install, pip_index_url) - + exec_script - ) - - # Add execution script to container arguments. - pod_template_spec.spec.containers[0].command = ["bash", "-c"] - pod_template_spec.spec.containers[0].args = [exec_script] + # Assign the rest Pod spec. If the value is None, container doesn't have that spec. + pod_template_spec.spec.init_containers = init_containers + pod_template_spec.spec.volumes = volumes_spec return pod_template_spec @@ -364,9 +376,11 @@ def get_pytorchjob_template( def get_pvc_spec( - pvc_name: str, namespace: str, storage_size: str, storage_class: Optional[str] + pvc_name: str, + namespace: str, + storage_config: Dict[str, Optional[str]], ): - if pvc_name is None or namespace is None or storage_size is None: + if pvc_name is None or namespace is None or "size" not in storage_config is None: raise ValueError("One of the arguments is None") pvc_spec = models.V1PersistentVolumeClaim( @@ -375,12 +389,14 @@ def get_pvc_spec( metadata={"name": pvc_name, "namepsace": namespace}, spec=models.V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce", "ReadOnlyMany"], - resources=models.V1ResourceRequirements(requests={"storage": storage_size}), + resources=models.V1ResourceRequirements( + requests={"storage": storage_config["size"]} + ), ), ) - if storage_class is not None: - pvc_spec.spec.storage_class_name = storage_class + if "storage_class" in storage_config: + pvc_spec.spec.storage_class_name = storage_config["storage_class"] return pvc_spec From fbb23fb2915439d2852c3b4b6f63c7b8a670e422 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Tue, 16 Jan 2024 00:51:45 +0000 Subject: [PATCH 2/8] Fix unbound var --- sdk/python/kubeflow/training/api/training_client.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 487fa0be8f..5947de91bc 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -163,11 +163,19 @@ def train( if isinstance(model_provider_parameters, HuggingFaceModelParams): mp = "hf" + else: + raise ValueError( + f"Invalid model provider parameters {model_provider_parameters}" + ) if isinstance(dataset_provider_parameters, S3DatasetParams): dp = "s3" elif isinstance(dataset_provider_parameters, HfDatasetParams): dp = "hf" + else: + raise ValueError( + f"Invalid dataset provider parameters {dataset_provider_parameters}" + ) # create init container spec init_container_spec = utils.get_container_spec( From 64039fcc4d1be440170b104f601ff448b116e985 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Tue, 16 Jan 2024 13:33:55 +0000 Subject: [PATCH 3/8] Assign values in get pod template --- .../kubeflow/training/api/training_client.py | 7 +++-- sdk/python/kubeflow/training/utils/utils.py | 30 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 5947de91bc..89f4e4c3af 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -222,14 +222,14 @@ def train( worker_pod_template_spec = utils.get_pod_template_spec( containers=[container_spec], init_containers=[init_container_spec], - volumes_spec=[constants.STORAGE_INITIALIZER_VOLUME], + volumes=[constants.STORAGE_INITIALIZER_VOLUME], ) # create master pod spec master_pod_template_spec = utils.get_pod_template_spec( containers=[container_spec], init_containers=[init_container_spec], - volumes_spec=[constants.STORAGE_INITIALIZER_VOLUME], + volumes=[constants.STORAGE_INITIALIZER_VOLUME], ) job = utils.get_pytorchjob_template( @@ -364,7 +364,8 @@ def create_job( pip_index_url=pip_index_url, resources=resources_per_worker, ) - # Get Pod template spec from function or image. + + # Get Pod template spec using the above container. pod_template_spec = utils.get_pod_template_spec( containers=[container_spec], ) diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index b03afaa381..663b311699 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -134,7 +134,7 @@ def get_command_using_train_func( pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, ) -> Tuple[List[str], List[str]]: """ - Get container args and command using the given training function and parameters. + Get container args and command from the given training function and parameters. """ # Check if function is callable. if not callable(train_func): @@ -179,7 +179,7 @@ def get_command_using_train_func( + exec_script ) - # Return container command and args execution script to the container arguments. + # Return container command and args to execute training function. return ["bash", "-c"], [exec_script] @@ -202,9 +202,11 @@ def get_container_spec( raise ValueError("Container name or base image cannot be none") # Create initial container spec. - container_spec = models.V1Container(name=name, image=base_image) + container_spec = models.V1Container( + name=name, image=base_image, args=args, volume_mounts=volume_mounts + ) - # If Training function is set, convert training function to the container args and command. + # If training function is set, override container command and args to execute the function. if train_func is not None: container_spec.command, container_spec.args = get_command_using_train_func( train_func=train_func, @@ -212,9 +214,6 @@ def get_container_spec( packages_to_install=packages_to_install, pip_index_url=pip_index_url, ) - # Otherwise, get container args from the input. - else: - container_spec.args = args # Convert dict to the Kubernetes container resources if that is required. if isinstance(resources, dict): @@ -228,9 +227,8 @@ def get_container_spec( limits=resources, ) - # Assign the rest container spec. If the value is None, container doesn't have that spec. + # Add resources to the container spec. container_spec.resources = resources - container_spec.volume_mounts = volume_mounts return container_spec @@ -238,24 +236,24 @@ def get_container_spec( def get_pod_template_spec( containers: List[models.V1Container], init_containers: Optional[List[models.V1Container]] = None, - volumes_spec: Optional[List[models.V1Volume]] = None, + volumes: Optional[List[models.V1Volume]] = None, ) -> models.V1PodTemplateSpec: """ Get Pod template spec for the given parameters. """ - # Create initial Pod template spec. + # Create Pod template spec. If the value is None, Pod doesn't have that parameter pod_template_spec = models.V1PodTemplateSpec( metadata=models.V1ObjectMeta( annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} ), - spec=models.V1PodSpec(containers=[containers]), + spec=models.V1PodSpec( + init_containers=init_containers, + containers=containers, + volumes=volumes, + ), ) - # Assign the rest Pod spec. If the value is None, container doesn't have that spec. - pod_template_spec.spec.init_containers = init_containers - pod_template_spec.spec.volumes = volumes_spec - return pod_template_spec From 5ffb32a98ed63b5c8d07279a9ff55e66c4d48c88 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Tue, 16 Jan 2024 20:30:06 +0000 Subject: [PATCH 4/8] Add torchrun issue --- sdk/python/kubeflow/training/api/training_client.py | 2 ++ sdk/python/kubeflow/training/utils/utils.py | 9 +++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 89f4e4c3af..bbdc353ee7 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -139,6 +139,8 @@ def train( namespace = namespace or self.namespace + # TODO (andreyvelich): PVC Creation should be part of Training Operator Controller. + # Ref issue: https://github.com/kubeflow/training-operator/issues/1971 try: self.core_api.create_namespaced_persistent_volume_claim( namespace=namespace, diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 663b311699..f83c38ed4d 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -322,7 +322,7 @@ def get_pytorchjob_template( # Check if at least one Worker is set. # TODO (andreyvelich): Remove this check once we have CEL validation. # Ref: https://github.com/kubeflow/training-operator/issues/1708 - if num_workers is None or num_workers < 0: + if num_workers is None or num_workers < 1: raise ValueError("At least one Worker for PyTorchJob must be set") # Create PyTorchJob template. @@ -333,13 +333,13 @@ def get_pytorchjob_template( spec=models.KubeflowOrgV1PyTorchJobSpec( run_policy=models.KubeflowOrgV1RunPolicy(clean_pod_policy=None), pytorch_replica_specs={}, + elastic_policy=elastic_policy, ), ) + # TODO (andreyvelich): Should we make spec.nproc_per_node int ? if num_procs_per_worker: pytorchjob.spec.nproc_per_node = str(num_procs_per_worker) - if elastic_policy: - pytorchjob.spec.elastic_policy = elastic_policy # Create Master replica if that is set. if master_pod_template_spec: @@ -361,7 +361,8 @@ def get_pytorchjob_template( # Create Worker with num_workers - 1 replicas. # TODO (andreyvelich): Investigate if we can run PyTorchJob without the Master # Currently, if Master is not set, Training Operator controller - # doesn't set RANK and WORLD_SIZE for PyTorchJob + # doesn't set RANK and WORLD_SIZE for PyTorchJob. + # Ref issue: https://github.com/kubeflow/training-operator/issues/1991 if num_workers > 1: pytorchjob.spec.pytorch_replica_specs[ constants.REPLICA_TYPE_WORKER From 5031fee6aa47d75a08c967793bfe5a964188e5ae Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Tue, 16 Jan 2024 20:46:55 +0000 Subject: [PATCH 5/8] Test to create PyTorchJob from Image --- .../training/api/training_client_test.py | 19 +++++++------ sdk/python/test/e2e/test_e2e_pytorchjob.py | 28 +++++++++++++++++++ 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index 5527fca9c4..e4d76b2766 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -123,15 +123,6 @@ def __init__(self, kind) -> None: }, ValueError, ), - ( - "invalid pod template spec parameters", - { - "name": "test job", - "train_func": lambda: "test train function", - "job_kind": constants.MXJOB_KIND, - }, - KeyError, - ), ( "paddle job can't be created using function", { @@ -174,6 +165,16 @@ def __init__(self, kind) -> None: }, "success", ), + ( + "valid flow to create job using image", + { + "name": "test-job", + "namespace": "test", + "base_image": "docker.io/test-training", + "num_workers": 2, + }, + "success", + ), ] diff --git a/sdk/python/test/e2e/test_e2e_pytorchjob.py b/sdk/python/test/e2e/test_e2e_pytorchjob.py index 92c6ae9764..51fb456b4e 100644 --- a/sdk/python/test/e2e/test_e2e_pytorchjob.py +++ b/sdk/python/test/e2e/test_e2e_pytorchjob.py @@ -212,6 +212,34 @@ def train_func(): TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace) +@pytest.mark.skipif( + GANG_SCHEDULER_NAME in GANG_SCHEDULERS, + reason="For plain scheduling", +) +def test_sdk_e2e_create_from_image(job_namespace): + JOB_NAME = "pytorchjob-from-image" + + TRAINING_CLIENT.create_job( + name=JOB_NAME, + namespace=job_namespace, + base_image="docker.io/hello-world", + num_workers=1, + ) + + logging.info(f"List of created {TRAINING_CLIENT.job_kind}s") + logging.info(TRAINING_CLIENT.list_jobs(job_namespace)) + + try: + utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900) + except Exception as e: + utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace) + TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace) + raise Exception(f"PyTorchJob create from function E2E fails. Exception: {e}") + + utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace) + TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace) + + def generate_pytorchjob( job_namespace: str, job_name: str, From f8dfdc155f429517f9d5dc9d8da5192737ba3912 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Tue, 16 Jan 2024 21:53:23 +0000 Subject: [PATCH 6/8] Fix e2e to create from image --- sdk/python/test/e2e/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/test/e2e/utils.py b/sdk/python/test/e2e/utils.py index a52393bbfc..b527ab8fec 100644 --- a/sdk/python/test/e2e/utils.py +++ b/sdk/python/test/e2e/utils.py @@ -47,7 +47,8 @@ def verify_job_e2e( # Job should have Created, Running, and Succeeded conditions. conditions = client.get_job_conditions(job=job) - if len(conditions) != 3: + # If Job is complete fast, it has 2 conditions: Created and Succeeded. + if len(conditions) != 3 and len(conditions) != 2: raise Exception(f"{client.job_kind} conditions are invalid: {conditions}") # Job should have correct conditions. From 3dd7ab7f26f9baa982f968cf6695a1b24707b9e7 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 17 Jan 2024 14:03:07 +0000 Subject: [PATCH 7/8] Fix condition --- sdk/python/kubeflow/training/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index f83c38ed4d..5480044fa9 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -379,7 +379,7 @@ def get_pvc_spec( namespace: str, storage_config: Dict[str, Optional[str]], ): - if pvc_name is None or namespace is None or "size" not in storage_config is None: + if pvc_name is None or namespace is None or "size" not in storage_config: raise ValueError("One of the arguments is None") pvc_spec = models.V1PersistentVolumeClaim( From 174b050cce88342b29bf3e098a67e5afa9d3fb9a Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Thu, 18 Jan 2024 14:54:14 +0000 Subject: [PATCH 8/8] Modify check test conditions --- sdk/python/test/e2e/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/test/e2e/utils.py b/sdk/python/test/e2e/utils.py index b527ab8fec..7a6f81f922 100644 --- a/sdk/python/test/e2e/utils.py +++ b/sdk/python/test/e2e/utils.py @@ -48,7 +48,7 @@ def verify_job_e2e( # Job should have Created, Running, and Succeeded conditions. conditions = client.get_job_conditions(job=job) # If Job is complete fast, it has 2 conditions: Created and Succeeded. - if len(conditions) != 3 and len(conditions) != 2: + if len(conditions) < 2: raise Exception(f"{client.job_kind} conditions are invalid: {conditions}") # Job should have correct conditions.