Skip to content

Commit f6907e8

Browse files
d4l3kfacebook-github-bot
authored andcommitted
adapters/kfp: support distributed training (#109)
Summary: This adds a new `resource_to_app` KFP adapter that allows adapting an app to a kfp ResourceOp that launches the operator using the Volcano scheduler. This reuses the same code that creates the resources for the kubernetes scheduler and embeds the resource inside a KFP pipeline. This isn't supported under KFP v2 since it interacts directly with kubernetes resources/volcano. This also requires volcano to be installed on the cluster to use which is why it's a new adapter instead of automatically being used. This is still fairly experimental and once KFP has better distributed support we likely want to rely on that instead since this has some less than ideal UX. You need to use the CLI to access the individual worker logs and there isn't any support for UI metadata yet. UI metadata I think can be added by providing an output annotation for argo as part of the resource but I haven't looked into it. Pull Request resolved: #109 Test Plan: ``` pyre pytest python dist_pipeline.py ``` http://5ab6bab9-istiosystem-istio-2af2-1926929629.us-west-2.elb.amazonaws.com/_/pipeline/#/runs/details/27707de9-bc67-42da-ab86-af2127ee54d1 ![20210726_14h12m04s_grim](https://user-images.githubusercontent.com/909104/127059928-b4787429-e895-4b97-b53e-c6262e99c52b.png) Reviewed By: kiukchung Differential Revision: D29921246 Pulled By: d4l3k fbshipit-source-id: ead6d0d4265cd22dd46fb89621a938c7281d7f5b
1 parent 9439d74 commit f6907e8

File tree

6 files changed

+168
-34
lines changed

6 files changed

+168
-34
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,4 @@ dmypy.json
138138
cython_debug/
139139

140140
wordlist.dic
141+
pipeline.yaml

docs/papermill.sh

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@
77

88
WORK_DIR=/tmp/papermill
99

10-
set -e
10+
set -ex
1111
mkdir -p "$WORK_DIR"
12+
13+
# create empty master.tar.gz file and setup symlinks instead of pulling from
14+
# master so we can handle local changes
15+
tar -cJf "$WORK_DIR/master.tar.gz" -T /dev/null
16+
ROOT="$(pwd)/.."
17+
(cd "$WORK_DIR" && ln -s "$ROOT/torchx" . && ln -s "$ROOT/examples" .)
18+
1219
files="$(find "$(pwd)"/build -name '*.ipynb')"
1320
for file in $files
1421
do

docs/source/pipelines.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ torchx.pipelines.kfp
1313
.. currentmodule:: torchx.pipelines.kfp.adapter
1414

1515
.. autofunction:: container_from_app
16+
.. autofunction:: resource_from_app
1617
.. autofunction:: component_from_app
1718
.. autofunction:: component_spec_from_app
1819

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Facebook, Inc. and its affiliates.
3+
# All rights reserved.
4+
#
5+
# This source code is licensed under the BSD-style license found in the
6+
# LICENSE file in the root directory of this source tree.
7+
8+
"""
9+
Distributed KubeFlow Pipelines Example
10+
======================================
11+
12+
This is an example KFP pipeline that uses resource_from_app to launch a
13+
distributed operator using the kubernetes/volcano job scheduler. This only works
14+
in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them.
15+
"""
16+
17+
import kfp
18+
from torchx import specs
19+
from torchx.pipelines.kfp.adapter import resource_from_app
20+
21+
22+
def pipeline() -> None:
23+
# First we define our AppDef for the component, we set
24+
echo_app = specs.AppDef(
25+
name="test-dist",
26+
roles=[
27+
specs.Role(
28+
name="dist-echo",
29+
image="alpine",
30+
entrypoint="/bin/echo",
31+
args=["hello dist!"],
32+
num_replicas=3,
33+
),
34+
],
35+
)
36+
37+
# To convert the TorchX AppDef into a KFP container we use
38+
# the resource_from_app adapter. This takes generates a KFP Kubernetes
39+
# resource operator definition from the TorchX app def and instantiates it.
40+
echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="test")
41+
42+
43+
# %%
44+
# To generate the pipeline definition file we need to call into the KFP compiler
45+
# with our pipeline function.
46+
47+
kfp.compiler.Compiler().compile(
48+
pipeline_func=pipeline,
49+
package_path="pipeline.yaml",
50+
)
51+
52+
with open("pipeline.yaml", "rt") as f:
53+
print(f.read())
54+
55+
# %%
56+
# Once this has all run you should have a pipeline file (typically
57+
# pipeline.yaml) that you can upload to your KFP cluster via the UI or
58+
# a kfp.Client.
59+
#
60+
# See the
61+
# `KFP SDK Examples <https://www.kubeflow.org/docs/components/pipelines/tutorials/sdk-examples/#examples>`_
62+
# for more info on launching KFP pipelines.
63+
64+
# %%
65+
# See the :ref:`Advanced KubeFlow Pipelines Example` for how to chain multiple
66+
# components together and use builtin components.

torchx/pipelines/kfp/adapter.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
V1VolumeMount,
2323
V1EmptyDirVolumeSource,
2424
)
25+
from torchx.schedulers.kubernetes_scheduler import app_to_resource
2526
from torchx.specs import api
2627
from typing_extensions import Protocol
2728

@@ -229,3 +230,41 @@ def container_from_app(
229230
"""
230231
factory = component_from_app(app, ui_metadata)
231232
return factory(*args, **kwargs)
233+
234+
235+
def resource_from_app(
236+
app: api.AppDef,
237+
queue: str,
238+
) -> dsl.ResourceOp:
239+
"""
240+
resource_from_app generates a KFP ResourceOp from the provided app that uses
241+
the Volcano job scheduler on Kubernetes to run distributed apps. See
242+
https://volcano.sh/en/docs/ for more info on Volcano and how to install.
243+
244+
Args:
245+
app: The torchx AppDef to adapt.
246+
queue: the Volcano queue to schedule the operator in.
247+
248+
>>> import kfp
249+
>>> from torchx import specs
250+
>>> from torchx.pipelines.kfp.adapter import resource_from_app
251+
>>> app_def = specs.AppDef(
252+
... name="trainer",
253+
... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)],
254+
... )
255+
>>> def pipeline():
256+
... trainer = resource_from_app(app_def, queue="test")
257+
... print(trainer)
258+
>>> kfp.compiler.Compiler().compile(
259+
... pipeline_func=pipeline,
260+
... package_path="/tmp/pipeline.yaml",
261+
... )
262+
{'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}}
263+
"""
264+
return dsl.ResourceOp(
265+
name=app.name,
266+
action="create",
267+
success_condition="status.state.phase = Completed",
268+
failure_condition="status.state.phase = Failed",
269+
k8s_resource=app_to_resource(app, queue),
270+
)

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,56 @@ def role_to_pod(name: str, role: Role) -> "V1Pod":
164164
)
165165

166166

167+
def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]:
168+
"""
169+
app_to_resource creates a volcano job kubernetes resource definition from
170+
the provided AppDef. The resource definition can be used to launch the
171+
app on Kubernetes.
172+
173+
To support macros we generate one task per replica instead of using the
174+
volcano `replicas` field since macros change the arguments on a per
175+
replica basis.
176+
177+
Volcano has two levels of retries: one at the task level and one at the
178+
job level. When using the APPLICATION retry policy, the job level retry
179+
count is set to the minimum of the max_retries of the roles.
180+
"""
181+
tasks = []
182+
for i, role in enumerate(app.roles):
183+
for replica_id in range(role.num_replicas):
184+
values = macros.Values(
185+
img_root="",
186+
app_id=macros.app_id,
187+
replica_id=str(replica_id),
188+
)
189+
name = f"{role.name}-{replica_id}"
190+
replica_role = values.apply(role)
191+
pod = role_to_pod(name, replica_role)
192+
tasks.append(
193+
{
194+
"replicas": 1,
195+
"name": name,
196+
"template": pod,
197+
"maxRetry": role.max_retries,
198+
"policies": RETRY_POLICIES[role.retry_policy],
199+
}
200+
)
201+
202+
job_retries = min(role.max_retries for role in app.roles)
203+
resource: Dict[str, object] = {
204+
"apiVersion": "batch.volcano.sh/v1alpha1",
205+
"kind": "Job",
206+
"metadata": {"generateName": f"{app.name}-"},
207+
"spec": {
208+
"schedulerName": "volcano",
209+
"queue": queue,
210+
"tasks": tasks,
211+
"maxRetry": job_retries,
212+
},
213+
}
214+
return resource
215+
216+
167217
@dataclass
168218
class KubernetesJob:
169219
resource: Dict[str, object]
@@ -237,39 +287,9 @@ def _submit_dryrun(
237287
self, app: AppDef, cfg: RunConfig
238288
) -> AppDryRunInfo[KubernetesJob]:
239289
queue = cfg.get("queue")
240-
tasks = []
241-
for i, role in enumerate(app.roles):
242-
for replica_id in range(role.num_replicas):
243-
values = macros.Values(
244-
img_root="",
245-
app_id=macros.app_id,
246-
replica_id=str(replica_id),
247-
)
248-
name = f"{role.name}-{replica_id}"
249-
replica_role = values.apply(role)
250-
pod = role_to_pod(name, replica_role)
251-
tasks.append(
252-
{
253-
"replicas": 1,
254-
"name": name,
255-
"template": pod,
256-
"maxRetry": role.max_retries,
257-
"policies": RETRY_POLICIES[role.retry_policy],
258-
}
259-
)
260-
261-
job_retries = min(role.max_retries for role in app.roles)
262-
resource: Dict[str, object] = {
263-
"apiVersion": "batch.volcano.sh/v1alpha1",
264-
"kind": "Job",
265-
"metadata": {"generateName": f"{app.name}-"},
266-
"spec": {
267-
"schedulerName": "volcano",
268-
"queue": queue,
269-
"tasks": tasks,
270-
"maxRetry": job_retries,
271-
},
272-
}
290+
if not isinstance(queue, str):
291+
raise TypeError(f"config value 'queue' must be a string, got {queue}")
292+
resource = app_to_resource(app, queue)
273293
req = KubernetesJob(resource=resource)
274294
info = AppDryRunInfo(req, repr)
275295
info._app = app

0 commit comments

Comments
 (0)