Skip to content

feat(python, rust): Add GPU support to sink_* APIs #20940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 21, 2025

Conversation

mroeschke
Copy link
Contributor

@mroeschke mroeschke commented Jan 28, 2025

xref #20259

Makes the IR::Sink node serializable to Python so the sink options can be exposed in cuDF

@mroeschke mroeschke changed the title WIP: Add GPU engine to sink_csv feat(python): WIP: Add GPU engine to sink_csv Jan 29, 2025
@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars and removed title needs formatting labels Jan 29, 2025
@mroeschke mroeschke marked this pull request as ready for review February 4, 2025 03:43
Copy link

codecov bot commented Feb 5, 2025

Codecov Report

Attention: Patch coverage is 0% with 9 lines in your changes missing coverage. Please review.

Project coverage is 80.78%. Comparing base (4f75730) to head (fdcb58d).
Report is 51 commits behind head on main.

Files with missing lines Patch % Lines
...rates/polars-python/src/lazyframe/visitor/nodes.rs 0.00% 9 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #20940      +/-   ##
==========================================
- Coverage   80.79%   80.78%   -0.02%     
==========================================
  Files        1639     1639              
  Lines      235551   235557       +6     
  Branches     2714     2714              
==========================================
- Hits       190315   190283      -32     
- Misses      44595    44633      +38     
  Partials      641      641              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mroeschke mroeschke changed the title feat(python): WIP: Add GPU engine to sink_csv feat(python): Add GPU engine to sink_csv Feb 6, 2025
@mroeschke mroeschke changed the title feat(python): Add GPU engine to sink_csv feat(python, rust): Add GPU engine to sink_csv Feb 6, 2025
@mroeschke
Copy link
Contributor Author

mroeschke commented Feb 7, 2025

@wence- any suggestions on how we should expose the FuctionIR::Pipeline node to Python? It appears polars.LazyFrame.sink_csv uses this node. I suppose we can't accept all types of pipeline nodes (which we would raise in cudf as well).

FunctionIR::Pipeline {
function: _,
schema: _,
original: _,

@wence-
Copy link
Collaborator

wence- commented Feb 11, 2025

@wence- any suggestions on how we should expose the FuctionIR::Pipeline node to Python? It appears polars.LazyFrame.sink_csv uses this node. I suppose we can't accept all types of pipeline nodes (which we would raise in cudf as well).

FunctionIR::Pipeline {
function: _,
schema: _,
original: _,

I think I need some more context here to understand what is going on. Where does that pipeline come from when you do sink_csv, I don't see it?

@mroeschke
Copy link
Contributor Author

So with this PR on 6679b4b and rapidsai/cudf#17938 on 73bf00f, this is the error I'm getting so far

In [1]: import polars as pl

In [2]: pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
<ipython-input-2-efb5e662c021>:1: DeprecationWarning: The old streaming engine is being deprecated and will soon be replaced by the new streaming engine. Starting Polars version 1.23.0 and until the new streaming engine is released, the old streaming engine may become less usable. For people who rely on the old streaming engine, it is suggested to pin your version to before 1.23.0.

More information on the new streaming engine: https://github.com/pola-rs/polars/issues/20947
  pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
/polars/py-polars/polars/lazyframe/frame.py:2806: PerformanceWarning: Query execution with GPU not possible: unsupported operations.
The errors were:
- NotImplementedError: pipeline mapfunction
  return lf.sink_csv(
run UdfExec
RUN STREAMING PIPELINE
[df -> ordered_sink]

Where the NotImplementedError is coming from

} => return Err(PyNotImplementedError::new_err("pipeline mapfunction")),

So I'm assuming we'll need to translate the FuctionIR::Pipeline to Python for any of the polars sink_* methods

@wence-
Copy link
Collaborator

wence- commented Feb 12, 2025

Ah OK, I see. If you let ldf = ldf.clone().with_streaming(false) then you won't see the pipeline node. However, you won't see a Sink node either (because the wrapping of logical plan in a Sink happens inside ldf.sink_csv(...))

// Unpack the arena's.
// At this point the `nt` is useless.

std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tips on @wence- at #20940 (comment).

Now I'm able to get cudf to write the csv successfully, but the polars call raises this error. I think the failure might be somewhere here?

In [1]: import polars as pl

In [2]: pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
---------------------------------------------------------------------------
ComputeError                              Traceback (most recent call last)
Cell In[2], line 1
----> 1 pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")

File ~/polars/py-polars/polars/_utils/unstable.py:58, in unstable.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
     55 @wraps(function)
     56 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
     57     issue_unstable_warning(f"`{function.__name__}` is considered unstable.")
---> 58     return function(*args, **kwargs)

File ~/polars/py-polars/polars/lazyframe/frame.py:2806, in LazyFrame.sink_csv(self, path, include_bom, include_header, separator, line_terminator, quote_char, batch_size, datetime_format, date_format, time_format, float_scientific, float_precision, null_value, quote_style, maintain_order, type_coercion, _type_check, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, collapse_joins, no_optimization, storage_options, credential_provider, retries, engine)
   2803 else:
   2804     callback = None
-> 2806 return lf.sink_csv(
   2807     path=normalize_filepath(path),
   2808     include_bom=include_bom,
   2809     include_header=include_header,
   2810     separator=ord(separator),
   2811     line_terminator=line_terminator,
   2812     quote_char=ord(quote_char),
   2813     batch_size=batch_size,
   2814     datetime_format=datetime_format,
   2815     date_format=date_format,
   2816     time_format=time_format,
   2817     float_scientific=float_scientific,
   2818     float_precision=float_precision,
   2819     null_value=null_value,
   2820     quote_style=quote_style,
   2821     maintain_order=maintain_order,
   2822     cloud_options=storage_options,
   2823     credential_provider=credential_provider,
   2824     retries=retries,
   2825     lambda_post_opt=callback,
   2826 )

ComputeError: expected tuple got None

In [3]: cat foo.csv
0
2

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the callback inserts a PythonScan node into the IR. The execution of this node expects the callback to return either a DataFrame or a non-empty iterable of dataframe chunks.

Of course, because we're sinking to a file, we can return no such thing.

I suspect will want an equivalent to PythonScan callback which might be a PythonSink. Any thoughts @ritchie46?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some architectual changes. The Sink architecture is unstable at the moment and we will connect the new-streaming engine in a few weeks. Once we have that, I think we can prepare for a "special" kind of sink that offloads to the GPU engine and doesn't expect a DataFrame to be returned. Currently this is an assumption.

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is blocked until we have some architectual changes on the sink side. I expect to get to this in a few weeks.

@@ -2615,6 +2615,7 @@ def sink_csv(
| Literal["auto"]
| None = "auto",
retries: int = 2,
engine: EngineType = "cpu",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
engine: EngineType = "cpu",
engine: EngineType = "streaming",

@@ -2721,6 +2722,26 @@ def sink_csv(
at any point without it being considered a breaking change.
retries
Number of retries if accessing a cloud instance fails.
engine
Select the engine used to write the query result, optional.
If set to `"cpu"` (default), the query result is written using the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our "cpu" engine typically means our in-memory engine, this doesn't support sinking without collecting into memory first. I think this should only have "streaming" and "gpu" options.

// Unpack the arena's.
// At this point the `nt` is useless.

std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some architectual changes. The Sink architecture is unstable at the moment and we will connect the new-streaming engine in a few weeks. Once we have that, I think we can prepare for a "special" kind of sink that offloads to the GPU engine and doesn't expect a DataFrame to be returned. Currently this is an assumption.

@@ -181,7 +181,7 @@ where
Box::new(OrderedSink::new(input_schema.into_owned())) as Box<dyn SinkTrait>
},
#[allow(unused_variables)]
SinkTypeIR::File(FileSinkType {
SinkTypeIRf::File(FileSinkType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SinkTypeIRf::File(FileSinkType {
SinkTypeIR::File(FileSinkType {

Comment on lines 491 to 531
impl<'py> IntoPyObject<'py> for Wrap<CsvWriterOptions> {
type Target = PyDict;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;

fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
let dict = PyDict::new(py);
let _ = dict.set_item("include_bom", self.0.include_bom);
let _ = dict.set_item("include_header", self.0.include_header);
let _ = dict.set_item("batch_size", self.0.batch_size);
let _ = dict.set_item("serialize_options", Wrap(self.0.serialize_options));
Ok(dict)
}
}

impl<'py> IntoPyObject<'py> for Wrap<SerializeOptions> {
type Target = PyDict;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;

fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
let dict = PyDict::new(py);
let _ = dict.set_item("date_format", self.0.date_format);
let _ = dict.set_item("time_format", self.0.time_format);
let _ = dict.set_item("datetime_format", self.0.datetime_format);
let _ = dict.set_item("float_scientific", self.0.float_scientific);
let _ = dict.set_item("float_precision", self.0.float_precision);
let _ = dict.set_item("separator", self.0.separator);
let _ = dict.set_item("quote_char", self.0.quote_char);
let _ = dict.set_item("null", self.0.null);
let _ = dict.set_item("line_terminator", self.0.line_terminator);
let _ = dict.set_item("quote_style", Wrap(self.0.quote_style));
Ok(dict)
}
}

impl<'py> IntoPyObject<'py> for Wrap<QuoteStyle> {
type Target = PyString;
type Output = Bound<'py, Self::Target>;
type Error = Infallible;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Since these are "small" dicts, but to avoid getting out of date, I've been using the automatic serde-based serialisation for these options, which also isolates things just to the translation layer.

Comment on lines 661 to 691
FileType::Parquet(options) => Box::new(ParquetSink::new(
path,
*options,
input_schema.as_ref(),
cloud_options.as_ref(),
)?)
as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(options) => Box::new(IpcSink::new(
path,
*options,
input_schema.as_ref(),
cloud_options.as_ref(),
)?) as Box<dyn SinkTrait>,
#[cfg(feature = "csv")]
FileType::Csv(options) => Box::new(CsvSink::new(
path,
options.clone(),
input_schema.as_ref(),
cloud_options.as_ref(),
)?) as Box<dyn SinkTrait>,
#[cfg(feature = "json")]
FileType::Json(options) => Box::new(JsonSink::new(
path,
*options,
input_schema.as_ref(),
cloud_options.as_ref(),
)?)
as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
_ => unreachable!(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can just do the following to get all the relevant info, serialised as json (which means that if bits change we don't have to make too many matching updates here):

        IR::Sink { input, payload } => Sink {
            input: input.0,
            payload: serde_json::to_string(payload)
                .map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
        }
        .into_py_any(py),

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks! This works well - I see that is done for the IO readers as well.

@mroeschke mroeschke changed the title feat(python, rust): Add GPU engine to sink_csv feat(python, rust): Add GPU support to sink_* APIs Apr 9, 2025
@mroeschke
Copy link
Contributor Author

@ritchie46 could you re-review this PR? (it's gotten a lot simpler with the dedicate Sink IR node)

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a lot simpler. :)

@ritchie46 ritchie46 merged commit e66d3aa into pola-rs:main Apr 21, 2025
27 checks passed
@github-project-automation github-project-automation bot moved this from Todo to Done in cuDF Python Apr 21, 2025
@mroeschke mroeschke deleted the gpu/sink_csv branch April 21, 2025 17:50
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request May 15, 2025
Depends on pola-rs/polars#20940

closes #16738

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #18468
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars title needs formatting
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants