-
Notifications
You must be signed in to change notification settings - Fork 942
Add synchronous task scheduler to cudf-polars #18519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-25.06
Are you sure you want to change the base?
Add synchronous task scheduler to cudf-polars #18519
Conversation
# Keys must be distinct elements of the task. | ||
|
||
|
||
def istask(x: Any) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly orthogonal to your PR, but I'd be quite happy to adopt the concrete Task
classes like dask has. Especially because (IIUC) we're the only one building these tasks, via generate_ir_tasks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah exactly - We can adopt our own Task
class as long as we can transform it into something Dask-compatible for distributed execution.
return cache[arg] | ||
else: | ||
return arg | ||
except TypeError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know when this happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment - Since _execute_task
is called on every element of a proper task (to find executed keys), we end up calling it un un-hashable objects (e.g. lists and dicts). The TypeError just means we have an un-hashable argument in the task.
from typing import TypeAlias | ||
|
||
|
||
Key: TypeAlias = str | tuple[Union["Key", int], ...] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making sure I understand this type: what are our allowed Keys?
str
like'groupby-<hash>'
- Tuple like
('groupby-<hash>', 0)
Do we need to accept both of those? Or can we use just the tuple form?
And is the second member of the Union
correct? That it's either a Key or int? Should this maybe be
Key: TypeAlias = str | tuple[str, int]
(if we need both forms, and if I understood what's allowed here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to accept both of those? Or can we use just the tuple form?
Our final key is always a str
for now.
Should this maybe be...
Yeah, I was thinking we could have keys like tuple[str, int, int]
, but we could certainly be more strict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Key: TypeAlias = str | tuple[str, int]
for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this. Looks good to me.
Last question: maybe it'd be useful to allow get_scheduler()
to still return the dask scheduler. Rather than hard-coding dask-synchronous
or something, maybe #18516 (or a followup) could allow the scheduler to be a callable, in addition to named ones like distributed and synchronous.
Description
The streaming executor in cudf-polars currently relies on Dask for both "synchronous" and "distributed" task scheduling. This PR introduces a synchronous task scheduler to cudf-polars so that the user doesn't need to have Dask installed unless they want to run across multiple GPUs.
Checklist