Skip to content

Add synchronous task scheduler to cudf-polars #18519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: branch-25.06
Choose a base branch
from

Conversation

rjzamora
Copy link
Member

Description

The streaming executor in cudf-polars currently relies on Dask for both "synchronous" and "distributed" task scheduling. This PR introduces a synchronous task scheduler to cudf-polars so that the user doesn't need to have Dask installed unless they want to run across multiple GPUs.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora added 2 - In Progress Currently a work in progress improvement Improvement / enhancement to an existing function non-breaking Non-breaking change cudf.polars Issues specific to cudf.polars labels Apr 17, 2025
@rjzamora rjzamora self-assigned this Apr 17, 2025
@rjzamora rjzamora requested a review from a team as a code owner April 17, 2025 18:56
@github-actions github-actions bot added the Python Affects Python cuDF API. label Apr 17, 2025
# Keys must be distinct elements of the task.


def istask(x: Any) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly orthogonal to your PR, but I'd be quite happy to adopt the concrete Task classes like dask has. Especially because (IIUC) we're the only one building these tasks, via generate_ir_tasks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly - We can adopt our own Task class as long as we can transform it into something Dask-compatible for distributed execution.

return cache[arg]
else:
return arg
except TypeError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know when this happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment - Since _execute_task is called on every element of a proper task (to find executed keys), we end up calling it un un-hashable objects (e.g. lists and dicts). The TypeError just means we have an un-hashable argument in the task.

from typing import TypeAlias


Key: TypeAlias = str | tuple[Union["Key", int], ...]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure I understand this type: what are our allowed Keys?

  1. str like 'groupby-<hash>'
  2. Tuple like ('groupby-<hash>', 0)

Do we need to accept both of those? Or can we use just the tuple form?

And is the second member of the Union correct? That it's either a Key or int? Should this maybe be

Key: TypeAlias = str | tuple[str, int]

(if we need both forms, and if I understood what's allowed here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to accept both of those? Or can we use just the tuple form?

Our final key is always a str for now.

Should this maybe be...

Yeah, I was thinking we could have keys like tuple[str, int, int], but we could certainly be more strict.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Key: TypeAlias = str | tuple[str, int] for now.

@rjzamora rjzamora added 3 - Ready for Review Ready for review by team and removed 2 - In Progress Currently a work in progress labels Apr 17, 2025
Copy link
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. Looks good to me.

Last question: maybe it'd be useful to allow get_scheduler() to still return the dask scheduler. Rather than hard-coding dask-synchronous or something, maybe #18516 (or a followup) could allow the scheduler to be a callable, in addition to named ones like distributed and synchronous.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team cudf.polars Issues specific to cudf.polars improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

2 participants