-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
feat(python, rust): Add GPU support to sink_* APIs #20940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #20940 +/- ##
==========================================
- Coverage 80.79% 80.78% -0.02%
==========================================
Files 1639 1639
Lines 235551 235557 +6
Branches 2714 2714
==========================================
- Hits 190315 190283 -32
- Misses 44595 44633 +38
Partials 641 641 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@wence- any suggestions on how we should expose the polars/crates/polars-python/src/lazyframe/visitor/nodes.rs Lines 562 to 565 in 15fa50a
|
I think I need some more context here to understand what is going on. Where does that pipeline come from when you do |
So with this PR on 6679b4b and rapidsai/cudf#17938 on 73bf00f, this is the error I'm getting so far In [1]: import polars as pl
In [2]: pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
<ipython-input-2-efb5e662c021>:1: DeprecationWarning: The old streaming engine is being deprecated and will soon be replaced by the new streaming engine. Starting Polars version 1.23.0 and until the new streaming engine is released, the old streaming engine may become less usable. For people who rely on the old streaming engine, it is suggested to pin your version to before 1.23.0.
More information on the new streaming engine: https://github.com/pola-rs/polars/issues/20947
pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
/polars/py-polars/polars/lazyframe/frame.py:2806: PerformanceWarning: Query execution with GPU not possible: unsupported operations.
The errors were:
- NotImplementedError: pipeline mapfunction
return lf.sink_csv(
run UdfExec
RUN STREAMING PIPELINE
[df -> ordered_sink] Where the
So I'm assuming we'll need to translate the |
Ah OK, I see. If you |
// Unpack the arena's. | ||
// At this point the `nt` is useless. | ||
|
||
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tips on @wence- at #20940 (comment).
Now I'm able to get cudf to write the csv successfully, but the polars call raises this error. I think the failure might be somewhere here?
In [1]: import polars as pl
In [2]: pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
---------------------------------------------------------------------------
ComputeError Traceback (most recent call last)
Cell In[2], line 1
----> 1 pl.LazyFrame({"1": 2}).sink_csv("foo.csv", engine="gpu")
File ~/polars/py-polars/polars/_utils/unstable.py:58, in unstable.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
55 @wraps(function)
56 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
57 issue_unstable_warning(f"`{function.__name__}` is considered unstable.")
---> 58 return function(*args, **kwargs)
File ~/polars/py-polars/polars/lazyframe/frame.py:2806, in LazyFrame.sink_csv(self, path, include_bom, include_header, separator, line_terminator, quote_char, batch_size, datetime_format, date_format, time_format, float_scientific, float_precision, null_value, quote_style, maintain_order, type_coercion, _type_check, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, collapse_joins, no_optimization, storage_options, credential_provider, retries, engine)
2803 else:
2804 callback = None
-> 2806 return lf.sink_csv(
2807 path=normalize_filepath(path),
2808 include_bom=include_bom,
2809 include_header=include_header,
2810 separator=ord(separator),
2811 line_terminator=line_terminator,
2812 quote_char=ord(quote_char),
2813 batch_size=batch_size,
2814 datetime_format=datetime_format,
2815 date_format=date_format,
2816 time_format=time_format,
2817 float_scientific=float_scientific,
2818 float_precision=float_precision,
2819 null_value=null_value,
2820 quote_style=quote_style,
2821 maintain_order=maintain_order,
2822 cloud_options=storage_options,
2823 credential_provider=credential_provider,
2824 retries=retries,
2825 lambda_post_opt=callback,
2826 )
ComputeError: expected tuple got None
In [3]: cat foo.csv
0
2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the callback inserts a PythonScan
node into the IR. The execution of this node expects the callback to return either a DataFrame
or a non-empty iterable of dataframe chunks.
Of course, because we're sinking to a file, we can return no such thing.
I suspect will want an equivalent to PythonScan
callback which might be a PythonSink
. Any thoughts @ritchie46?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some architectual changes. The Sink
architecture is unstable at the moment and we will connect the new-streaming engine in a few weeks. Once we have that, I think we can prepare for a "special" kind of sink that offloads to the GPU engine and doesn't expect a DataFrame
to be returned. Currently this is an assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is blocked until we have some architectual changes on the sink side. I expect to get to this in a few weeks.
py-polars/polars/lazyframe/frame.py
Outdated
@@ -2615,6 +2615,7 @@ def sink_csv( | |||
| Literal["auto"] | |||
| None = "auto", | |||
retries: int = 2, | |||
engine: EngineType = "cpu", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
engine: EngineType = "cpu", | |
engine: EngineType = "streaming", |
py-polars/polars/lazyframe/frame.py
Outdated
@@ -2721,6 +2722,26 @@ def sink_csv( | |||
at any point without it being considered a breaking change. | |||
retries | |||
Number of retries if accessing a cloud instance fails. | |||
engine | |||
Select the engine used to write the query result, optional. | |||
If set to `"cpu"` (default), the query result is written using the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our "cpu" engine typically means our in-memory engine, this doesn't support sinking without collecting into memory first. I think this should only have "streaming" and "gpu" options.
// Unpack the arena's. | ||
// At this point the `nt` is useless. | ||
|
||
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some architectual changes. The Sink
architecture is unstable at the moment and we will connect the new-streaming engine in a few weeks. Once we have that, I think we can prepare for a "special" kind of sink that offloads to the GPU engine and doesn't expect a DataFrame
to be returned. Currently this is an assumption.
@@ -181,7 +181,7 @@ where | |||
Box::new(OrderedSink::new(input_schema.into_owned())) as Box<dyn SinkTrait> | |||
}, | |||
#[allow(unused_variables)] | |||
SinkTypeIR::File(FileSinkType { | |||
SinkTypeIRf::File(FileSinkType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SinkTypeIRf::File(FileSinkType { | |
SinkTypeIR::File(FileSinkType { |
impl<'py> IntoPyObject<'py> for Wrap<CsvWriterOptions> { | ||
type Target = PyDict; | ||
type Output = Bound<'py, Self::Target>; | ||
type Error = PyErr; | ||
|
||
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> { | ||
let dict = PyDict::new(py); | ||
let _ = dict.set_item("include_bom", self.0.include_bom); | ||
let _ = dict.set_item("include_header", self.0.include_header); | ||
let _ = dict.set_item("batch_size", self.0.batch_size); | ||
let _ = dict.set_item("serialize_options", Wrap(self.0.serialize_options)); | ||
Ok(dict) | ||
} | ||
} | ||
|
||
impl<'py> IntoPyObject<'py> for Wrap<SerializeOptions> { | ||
type Target = PyDict; | ||
type Output = Bound<'py, Self::Target>; | ||
type Error = PyErr; | ||
|
||
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> { | ||
let dict = PyDict::new(py); | ||
let _ = dict.set_item("date_format", self.0.date_format); | ||
let _ = dict.set_item("time_format", self.0.time_format); | ||
let _ = dict.set_item("datetime_format", self.0.datetime_format); | ||
let _ = dict.set_item("float_scientific", self.0.float_scientific); | ||
let _ = dict.set_item("float_precision", self.0.float_precision); | ||
let _ = dict.set_item("separator", self.0.separator); | ||
let _ = dict.set_item("quote_char", self.0.quote_char); | ||
let _ = dict.set_item("null", self.0.null); | ||
let _ = dict.set_item("line_terminator", self.0.line_terminator); | ||
let _ = dict.set_item("quote_style", Wrap(self.0.quote_style)); | ||
Ok(dict) | ||
} | ||
} | ||
|
||
impl<'py> IntoPyObject<'py> for Wrap<QuoteStyle> { | ||
type Target = PyString; | ||
type Output = Bound<'py, Self::Target>; | ||
type Error = Infallible; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Since these are "small" dicts, but to avoid getting out of date, I've been using the automatic serde-based serialisation for these options, which also isolates things just to the translation layer.
FileType::Parquet(options) => Box::new(ParquetSink::new( | ||
path, | ||
*options, | ||
input_schema.as_ref(), | ||
cloud_options.as_ref(), | ||
)?) | ||
as Box<dyn SinkTrait>, | ||
#[cfg(feature = "ipc")] | ||
FileType::Ipc(options) => Box::new(IpcSink::new( | ||
path, | ||
*options, | ||
input_schema.as_ref(), | ||
cloud_options.as_ref(), | ||
)?) as Box<dyn SinkTrait>, | ||
#[cfg(feature = "csv")] | ||
FileType::Csv(options) => Box::new(CsvSink::new( | ||
path, | ||
options.clone(), | ||
input_schema.as_ref(), | ||
cloud_options.as_ref(), | ||
)?) as Box<dyn SinkTrait>, | ||
#[cfg(feature = "json")] | ||
FileType::Json(options) => Box::new(JsonSink::new( | ||
path, | ||
*options, | ||
input_schema.as_ref(), | ||
cloud_options.as_ref(), | ||
)?) | ||
as Box<dyn SinkTrait>, | ||
#[allow(unreachable_patterns)] | ||
_ => unreachable!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can just do the following to get all the relevant info, serialised as json (which means that if bits change we don't have to make too many matching updates here):
IR::Sink { input, payload } => Sink {
input: input.0,
payload: serde_json::to_string(payload)
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?,
}
.into_py_any(py),
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks! This works well - I see that is done for the IO readers as well.
@ritchie46 could you re-review this PR? (it's gotten a lot simpler with the dedicate Sink IR node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a lot simpler. :)
Depends on pola-rs/polars#20940 closes #16738 Authors: - Matthew Roeschke (https://github.com/mroeschke) - Matthew Murray (https://github.com/Matt711) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Vukasin Milovanovic (https://github.com/vuule) - Lawrence Mitchell (https://github.com/wence-) URL: #18468
xref #20259
Makes the
IR::Sink
node serializable to Python so the sink options can be exposed in cuDF