diff --git a/.gitignore b/.gitignore index 2e7f3ea8d..690188c2f 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,4 @@ dmypy.json cython_debug/ wordlist.dic +pipeline.yaml diff --git a/docs/papermill.sh b/docs/papermill.sh index 5c84499c5..5c2c29011 100755 --- a/docs/papermill.sh +++ b/docs/papermill.sh @@ -7,8 +7,15 @@ WORK_DIR=/tmp/papermill -set -e +set -ex mkdir -p "$WORK_DIR" + +# create empty master.tar.gz file and setup symlinks instead of pulling from +# master so we can handle local changes +tar -cJf "$WORK_DIR/master.tar.gz" -T /dev/null +ROOT="$(pwd)/.." +(cd "$WORK_DIR" && ln -s "$ROOT/torchx" . && ln -s "$ROOT/examples" .) + files="$(find "$(pwd)"/build -name '*.ipynb')" for file in $files do diff --git a/docs/source/pipelines.rst b/docs/source/pipelines.rst index 9d1569192..6d7935888 100644 --- a/docs/source/pipelines.rst +++ b/docs/source/pipelines.rst @@ -13,6 +13,7 @@ torchx.pipelines.kfp .. currentmodule:: torchx.pipelines.kfp.adapter .. autofunction:: container_from_app +.. autofunction:: resource_from_app .. autofunction:: component_from_app .. autofunction:: component_spec_from_app diff --git a/examples/pipelines/kfp/dist_pipeline.py b/examples/pipelines/kfp/dist_pipeline.py new file mode 100644 index 000000000..dbaac4b29 --- /dev/null +++ b/examples/pipelines/kfp/dist_pipeline.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Distributed KubeFlow Pipelines Example +====================================== + +This is an example KFP pipeline that uses resource_from_app to launch a +distributed operator using the kubernetes/volcano job scheduler. This only works +in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them. +""" + +import kfp +from torchx import specs +from torchx.pipelines.kfp.adapter import resource_from_app + + +def pipeline() -> None: + # First we define our AppDef for the component, we set + echo_app = specs.AppDef( + name="test-dist", + roles=[ + specs.Role( + name="dist-echo", + image="alpine", + entrypoint="/bin/echo", + args=["hello dist!"], + num_replicas=3, + ), + ], + ) + + # To convert the TorchX AppDef into a KFP container we use + # the resource_from_app adapter. This takes generates a KFP Kubernetes + # resource operator definition from the TorchX app def and instantiates it. + echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="test") + + +# %% +# To generate the pipeline definition file we need to call into the KFP compiler +# with our pipeline function. + +kfp.compiler.Compiler().compile( + pipeline_func=pipeline, + package_path="pipeline.yaml", +) + +with open("pipeline.yaml", "rt") as f: + print(f.read()) + +# %% +# Once this has all run you should have a pipeline file (typically +# pipeline.yaml) that you can upload to your KFP cluster via the UI or +# a kfp.Client. +# +# See the +# `KFP SDK Examples `_ +# for more info on launching KFP pipelines. + +# %% +# See the :ref:`Advanced KubeFlow Pipelines Example` for how to chain multiple +# components together and use builtin components. diff --git a/torchx/pipelines/kfp/adapter.py b/torchx/pipelines/kfp/adapter.py index 577dc39c0..2d74b02a0 100644 --- a/torchx/pipelines/kfp/adapter.py +++ b/torchx/pipelines/kfp/adapter.py @@ -22,6 +22,7 @@ V1VolumeMount, V1EmptyDirVolumeSource, ) +from torchx.schedulers.kubernetes_scheduler import app_to_resource from torchx.specs import api from typing_extensions import Protocol @@ -229,3 +230,41 @@ def container_from_app( """ factory = component_from_app(app, ui_metadata) return factory(*args, **kwargs) + + +def resource_from_app( + app: api.AppDef, + queue: str, +) -> dsl.ResourceOp: + """ + resource_from_app generates a KFP ResourceOp from the provided app that uses + the Volcano job scheduler on Kubernetes to run distributed apps. See + https://volcano.sh/en/docs/ for more info on Volcano and how to install. + + Args: + app: The torchx AppDef to adapt. + queue: the Volcano queue to schedule the operator in. + + >>> import kfp + >>> from torchx import specs + >>> from torchx.pipelines.kfp.adapter import resource_from_app + >>> app_def = specs.AppDef( + ... name="trainer", + ... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)], + ... ) + >>> def pipeline(): + ... trainer = resource_from_app(app_def, queue="test") + ... print(trainer) + >>> kfp.compiler.Compiler().compile( + ... pipeline_func=pipeline, + ... package_path="/tmp/pipeline.yaml", + ... ) + {'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}} + """ + return dsl.ResourceOp( + name=app.name, + action="create", + success_condition="status.state.phase = Completed", + failure_condition="status.state.phase = Failed", + k8s_resource=app_to_resource(app, queue), + ) diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 1ab56f526..d12ecd95d 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -164,6 +164,56 @@ def role_to_pod(name: str, role: Role) -> "V1Pod": ) +def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]: + """ + app_to_resource creates a volcano job kubernetes resource definition from + the provided AppDef. The resource definition can be used to launch the + app on Kubernetes. + + To support macros we generate one task per replica instead of using the + volcano `replicas` field since macros change the arguments on a per + replica basis. + + Volcano has two levels of retries: one at the task level and one at the + job level. When using the APPLICATION retry policy, the job level retry + count is set to the minimum of the max_retries of the roles. + """ + tasks = [] + for i, role in enumerate(app.roles): + for replica_id in range(role.num_replicas): + values = macros.Values( + img_root="", + app_id=macros.app_id, + replica_id=str(replica_id), + ) + name = f"{role.name}-{replica_id}" + replica_role = values.apply(role) + pod = role_to_pod(name, replica_role) + tasks.append( + { + "replicas": 1, + "name": name, + "template": pod, + "maxRetry": role.max_retries, + "policies": RETRY_POLICIES[role.retry_policy], + } + ) + + job_retries = min(role.max_retries for role in app.roles) + resource: Dict[str, object] = { + "apiVersion": "batch.volcano.sh/v1alpha1", + "kind": "Job", + "metadata": {"generateName": f"{app.name}-"}, + "spec": { + "schedulerName": "volcano", + "queue": queue, + "tasks": tasks, + "maxRetry": job_retries, + }, + } + return resource + + @dataclass class KubernetesJob: resource: Dict[str, object] @@ -237,39 +287,9 @@ def _submit_dryrun( self, app: AppDef, cfg: RunConfig ) -> AppDryRunInfo[KubernetesJob]: queue = cfg.get("queue") - tasks = [] - for i, role in enumerate(app.roles): - for replica_id in range(role.num_replicas): - values = macros.Values( - img_root="", - app_id=macros.app_id, - replica_id=str(replica_id), - ) - name = f"{role.name}-{replica_id}" - replica_role = values.apply(role) - pod = role_to_pod(name, replica_role) - tasks.append( - { - "replicas": 1, - "name": name, - "template": pod, - "maxRetry": role.max_retries, - "policies": RETRY_POLICIES[role.retry_policy], - } - ) - - job_retries = min(role.max_retries for role in app.roles) - resource: Dict[str, object] = { - "apiVersion": "batch.volcano.sh/v1alpha1", - "kind": "Job", - "metadata": {"generateName": f"{app.name}-"}, - "spec": { - "schedulerName": "volcano", - "queue": queue, - "tasks": tasks, - "maxRetry": job_retries, - }, - } + if not isinstance(queue, str): + raise TypeError(f"config value 'queue' must be a string, got {queue}") + resource = app_to_resource(app, queue) req = KubernetesJob(resource=resource) info = AppDryRunInfo(req, repr) info._app = app