-
Notifications
You must be signed in to change notification settings - Fork 125
add additional documentation for the with_overrides feature #1181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
7c906a5
4236710
a46a880
25b9ad4
3e9cb84
3a35218
0806462
2a294b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
# | ||
# For either a request or limit, refer to the {py:class}`flytekit:flytekit.Resources` documentation. | ||
# | ||
# | ||
# The following attributes can be specified for a `Resource`. | ||
# | ||
# 1. `cpu` | ||
|
@@ -85,51 +86,64 @@ def my_workflow(x: typing.List[int]) -> int: | |
# | ||
# ## Using `with_overrides` | ||
# | ||
# You can use the `with_overrides` method to override the resources allocated to the tasks dynamically. | ||
# Let's understand how the resources can be initialized with an example. | ||
# Tasks can also have task_config which provides configuration for a specific task types. For task_config, refer to the {py:func}`flytekit:flytekit.task` documentation. | ||
# | ||
# You can use the `with_overrides` method to override the resources and task_config allocated to the tasks dynamically. | ||
# Let's understand how the resources can be initialized and override with an example. | ||
|
||
# %% [markdown] | ||
# Import the dependencies. | ||
# %% | ||
import typing # noqa: E402 | ||
|
||
from flytekit import Resources, task, workflow # noqa: E402 | ||
from flytekit import Resources, task, workflow, dynamic # noqa: E402 | ||
from flytekitplugins.kftensorflow import PS, Chief, TfJob, Worker # noqa: E402 | ||
|
||
|
||
# %% [markdown] | ||
# Define a task and configure the resources to be allocated to it. | ||
# You can use tasks decorated with memory and storage hints like regular tasks in a workflow. | ||
# You can use tasks decorated with memory and storage hints like regular tasks in a workflow, or configuration for an {py:class}`flytekitplugins:flytekitplugins.kftensorflow.TfJob` that can run distributed TensorFlow training on Kubernetes. | ||
# %% | ||
@task(requests=Resources(cpu="1", mem="200Mi"), limits=Resources(cpu="2", mem="350Mi")) | ||
def count_unique_numbers_1(x: typing.List[int]) -> int: | ||
s = set() | ||
for i in x: | ||
s.add(i) | ||
return len(s) | ||
|
||
@task( | ||
task_config=TfJob( | ||
num_workers=1, | ||
num_ps_replicas=1, | ||
num_chief_replicas=1, | ||
), | ||
requests=Resources(cpu="1", mem="200Mi"), | ||
limits=Resources(cpu="2", mem="350Mi"), | ||
) | ||
def run_tfjob() -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This task shouldn't just return a string — it should actually showcase a TF operation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please explain it? If I understand correctly, are you suggesting that I should providing a detailed description of the entire TensorFlow training code, as this might overshadow the importance of discussing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. Could you then include this as part of a note but not as a code block? Not a code block because we aren't including a full-fledged code snippet. |
||
|
||
return "hello world" | ||
|
||
# %% [markdown] | ||
# Define a task that computes the square of a number. | ||
# The `with_overrides` method overrides the old resource allocations. | ||
# %% | ||
@task | ||
def square_1(x: int) -> int: | ||
return x * x | ||
@workflow | ||
def my_run() -> str: | ||
return run_tfjob().with_overrides(limits=Resources(cpu="6", mem="500Mi")) | ||
|
||
|
||
# %% [markdown] | ||
# The `with_overrides` method overrides the old resource allocations. | ||
# Or you can use `@dynamic` to generate tasks at runtime with any custom configurations you want. | ||
# %% | ||
@dynamic | ||
def dynamic_run(num_workers: int) -> str: | ||
return run_tfjob().with_overrides(task_config=TfJob( | ||
num_workers=num_workers, | ||
num_ps_replicas=1, | ||
num_chief_replicas=1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Include a new line. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please format the code using black and isort. |
||
@workflow | ||
def my_pipeline(x: typing.List[int]) -> int: | ||
return square_1(x=count_unique_numbers_1(x=x)).with_overrides(limits=Resources(cpu="6", mem="500Mi")) | ||
|
||
def start_dynamic_run(new_num_workers: int) -> str: | ||
return dynamic_run(num_workers=new_num_workers) | ||
|
||
# %% [markdown] | ||
# You can execute the workflow locally. | ||
# %% | ||
if __name__ == "__main__": | ||
print(count_unique_numbers_1(x=[1, 1, 2])) | ||
print(my_pipeline(x=[1, 1, 2])) | ||
print(f"Running my_run(): {my_run()}") | ||
print(f"Running dynamic_run(num_workers=4): {start_dynamic_run(new_num_workers=4)}") | ||
|
||
# %% [markdown] | ||
# You can see the memory allocation below. The memory limit is `500Mi` rather than `350Mi`, and the | ||
|
Uh oh!
There was an error while loading. Please reload this page.