Skip to content

Commit d56a20f

Browse files
committed
adapters/kfp: support distributed training
1 parent 85d83b1 commit d56a20f

File tree

6 files changed

+155
-34
lines changed

6 files changed

+155
-34
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,4 @@ dmypy.json
138138
cython_debug/
139139

140140
wordlist.dic
141+
pipeline.yaml

docs/papermill.sh

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@
77

88
WORK_DIR=/tmp/papermill
99

10-
set -e
10+
set -ex
1111
mkdir -p "$WORK_DIR"
12+
13+
# create empty master.tar.gz file and setup symlinks instead of pulling from
14+
# master so we can handle local changes
15+
tar -cJf "$WORK_DIR/master.tar.gz" -T /dev/null
16+
ROOT="$(pwd)/.."
17+
(cd "$WORK_DIR" && ln -s "$ROOT/torchx" . && ln -s "$ROOT/examples" .)
18+
1219
files="$(find "$(pwd)"/build -name '*.ipynb')"
1320
for file in $files
1421
do

docs/source/pipelines.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ torchx.pipelines.kfp
1313
.. currentmodule:: torchx.pipelines.kfp.adapter
1414

1515
.. autofunction:: container_from_app
16+
.. autofunction:: resource_from_app
1617
.. autofunction:: component_from_app
1718
.. autofunction:: component_spec_from_app
1819

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Facebook, Inc. and its affiliates.
3+
# All rights reserved.
4+
#
5+
# This source code is licensed under the BSD-style license found in the
6+
# LICENSE file in the root directory of this source tree.
7+
8+
"""
9+
Distributed KubeFlow Pipelines Example
10+
======================================
11+
12+
This is an example KFP pipeline that uses resource_from_app to launch a
13+
distributed operator using the kubernetes/volcano job scheduler. This only works
14+
in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them.
15+
"""
16+
17+
import kfp
18+
from torchx import specs
19+
from torchx.pipelines.kfp.adapter import resource_from_app
20+
21+
22+
def pipeline() -> None:
23+
# First we define our AppDef for the component, we set
24+
echo_app = specs.AppDef(
25+
name="test-dist",
26+
roles=[
27+
specs.Role(
28+
name="dist-echo",
29+
image="alpine",
30+
entrypoint="/bin/echo",
31+
args=["hello dist!"],
32+
num_replicas=3,
33+
),
34+
],
35+
)
36+
37+
# To convert the TorchX AppDef into a KFP container we use
38+
# the resource_from_app adapter. This takes generates a KFP Kubernetes
39+
# resource operator definition from the TorchX app def and instantiates it.
40+
echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="test")
41+
42+
43+
# %%
44+
# To generate the pipeline definition file we need to call into the KFP compiler
45+
# with our pipeline function.
46+
47+
kfp.compiler.Compiler().compile(
48+
pipeline_func=pipeline,
49+
package_path="pipeline.yaml",
50+
)
51+
52+
with open("pipeline.yaml", "rt") as f:
53+
print(f.read())
54+
55+
# %%
56+
# Once this has all run you should have a pipeline file (typically
57+
# pipeline.yaml) that you can upload to your KFP cluster via the UI or
58+
# a kfp.Client.
59+
#
60+
# See the
61+
# `KFP SDK Examples <https://www.kubeflow.org/docs/components/pipelines/tutorials/sdk-examples/#examples>`_
62+
# for more info on launching KFP pipelines.
63+
64+
# %%
65+
# See the :ref:`Advanced KubeFlow Pipelines Example` for how to chain multiple
66+
# components together and use builtin components.

torchx/pipelines/kfp/adapter.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
V1VolumeMount,
2323
V1EmptyDirVolumeSource,
2424
)
25+
from torchx.schedulers.kubernetes_scheduler import app_to_resource
2526
from torchx.specs import api
2627
from typing_extensions import Protocol
2728

@@ -229,3 +230,41 @@ def container_from_app(
229230
"""
230231
factory = component_from_app(app, ui_metadata)
231232
return factory(*args, **kwargs)
233+
234+
235+
def resource_from_app(
236+
app: api.AppDef,
237+
queue: str,
238+
) -> dsl.ResourceOp:
239+
"""
240+
resource_from_app generates a KFP ResourceOp from the provided app that uses
241+
the Volcano job scheduler on Kubernetes to run distributed apps. See
242+
https://volcano.sh/en/docs/ for more info on Volcano and how to install.
243+
244+
Args:
245+
app: The torchx AppDef to adapt.
246+
queue: the Volcano queue to schedule the operator in.
247+
248+
>>> import kfp
249+
>>> from torchx import specs
250+
>>> from torchx.pipelines.kfp.adapter import resource_from_app
251+
>>> app_def = specs.AppDef(
252+
... name="trainer",
253+
... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)],
254+
... )
255+
>>> def pipeline():
256+
... trainer = resource_from_app(app_def, queue="test")
257+
... print(trainer)
258+
>>> kfp.compiler.Compiler().compile(
259+
... pipeline_func=pipeline,
260+
... package_path="/tmp/pipeline.yaml",
261+
... )
262+
{'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}}
263+
"""
264+
return dsl.ResourceOp(
265+
name=app.name,
266+
action="create",
267+
success_condition="status.state.phase = Completed",
268+
failure_condition="status.state.phase = Failed",
269+
k8s_resource=app_to_resource(app, queue),
270+
)

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,43 @@ def role_to_pod(name: str, role: Role) -> "V1Pod":
164164
)
165165

166166

167+
def app_to_resource(app: AppDef, queue: str) -> Dict[str, object]:
168+
tasks = []
169+
for i, role in enumerate(app.roles):
170+
for replica_id in range(role.num_replicas):
171+
values = macros.Values(
172+
img_root="",
173+
app_id=macros.app_id,
174+
replica_id=str(replica_id),
175+
)
176+
name = f"{role.name}-{replica_id}"
177+
replica_role = values.apply(role)
178+
pod = role_to_pod(name, replica_role)
179+
tasks.append(
180+
{
181+
"replicas": 1,
182+
"name": name,
183+
"template": pod,
184+
"maxRetry": role.max_retries,
185+
"policies": RETRY_POLICIES[role.retry_policy],
186+
}
187+
)
188+
189+
job_retries = min(role.max_retries for role in app.roles)
190+
resource: Dict[str, object] = {
191+
"apiVersion": "batch.volcano.sh/v1alpha1",
192+
"kind": "Job",
193+
"metadata": {"generateName": f"{app.name}-"},
194+
"spec": {
195+
"schedulerName": "volcano",
196+
"queue": queue,
197+
"tasks": tasks,
198+
"maxRetry": job_retries,
199+
},
200+
}
201+
return resource
202+
203+
167204
@dataclass
168205
class KubernetesJob:
169206
resource: Dict[str, object]
@@ -237,39 +274,9 @@ def _submit_dryrun(
237274
self, app: AppDef, cfg: RunConfig
238275
) -> AppDryRunInfo[KubernetesJob]:
239276
queue = cfg.get("queue")
240-
tasks = []
241-
for i, role in enumerate(app.roles):
242-
for replica_id in range(role.num_replicas):
243-
values = macros.Values(
244-
img_root="",
245-
app_id=macros.app_id,
246-
replica_id=str(replica_id),
247-
)
248-
name = f"{role.name}-{replica_id}"
249-
replica_role = values.apply(role)
250-
pod = role_to_pod(name, replica_role)
251-
tasks.append(
252-
{
253-
"replicas": 1,
254-
"name": name,
255-
"template": pod,
256-
"maxRetry": role.max_retries,
257-
"policies": RETRY_POLICIES[role.retry_policy],
258-
}
259-
)
260-
261-
job_retries = min(role.max_retries for role in app.roles)
262-
resource: Dict[str, object] = {
263-
"apiVersion": "batch.volcano.sh/v1alpha1",
264-
"kind": "Job",
265-
"metadata": {"generateName": f"{app.name}-"},
266-
"spec": {
267-
"schedulerName": "volcano",
268-
"queue": queue,
269-
"tasks": tasks,
270-
"maxRetry": job_retries,
271-
},
272-
}
277+
if not isinstance(queue, str):
278+
raise TypeError(f"config value 'queue' must be a string, got {queue}")
279+
resource = app_to_resource(app, queue)
273280
req = KubernetesJob(resource=resource)
274281
info = AppDryRunInfo(req, repr)
275282
info._app = app

0 commit comments

Comments
 (0)