-
Notifications
You must be signed in to change notification settings - Fork 642
feat(api): Vectorized UDAFs #4707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@ogrisel Thanks for the issue. This is definitely on our radar and we'll probably start experimenting with support for this in the next month. In fact, the DuckDB folks just pointed us to
So, in short, yes there's a great chance of this happening :)
This is an interesting path for us to go down; it's great to hear a concrete use case for UDAFs since getting them to work well with a nice API and solid performance will be challenging.
Possibly! I think we'll need to do some prototyping before we can give a concrete yes or no to this. Really appreciate all the issues you're opening, it's wonderful to get feedback from users ❤️ |
Note that UDF (without A) support is enough to "apply" a trained model to a large dataset to score (compute the predictions / recommendations / price estimates / anomaly scores...) in an out-of-core manner and possibly distributed manner (if the backend is distributed) assuming the the UDF workers are long-lived and can reuse a cached model loaded by name from a shared filesystem or object store to process several chunks without re-paying the FS I/O and model loading overhead for each chunk. This alone would be quite valuable I think. Off-course, distributed model training with UDAFs would also be interesting but it would require a lot more work (and internal changes in scikit-learn in particular). |
Also: related question, does the dask backend support (vector) UDAFs? It should be quite natural to implement however the documentation on UDFs is lacking. |
Reading the clickhouse docs, it seems that it would be natural to implement Python vector UDFs with clickhouse as records are batched by default: https://clickhouse.com/docs/en/sql-reference/functions/#executable-user-defined-functions Both clickhouse and pandas can efficiently exchange data via the Arrow format (chunked dataframes): https://clickhouse.com/docs/en/interfaces/formats/#data-format-arrow so implemented a duckdb |
@cpcloud I drafted a proposal for Python UDAFs API in a duckdb issue if you are interested: duckdb/duckdb#5117. |
cc @icexelloss |
We now have vectorized scalar UDFs in the DuckDB backend. Check out the related blog post. |
Thanks, this is great. A few remarks:
|
Yeah, this is something we need to add. I'll create an issue to track this work.
That's a good question. Here's an extremely contrived but hopefully illustrative example of how you might do this:
The approach is basically to stuff everything into a struct and then The primary downside is that column names must be declared statically. You can't use the result of the UDF to compute the column names.
AFAIK, no. At what point do you want to control the chunking? When you call What would your ideal API look like for controlling the chunking? |
Not sure what you mean by "when you call to_pandas()" (when calling the function itself?).
Maybe as an argument to the decorator, to either control the target number of records per chunk or the average size in Bytes for each each. But honestly this might be a YAGNI for most people. |
That's fine. Since the type declaration, the knowledge of |
Vectorized (per-element) UDFs have been around for a while, retitling this to reflect that. |
Closing due to lack of activity. |
Hi @cpcloud , Hopefully I will get reponse for this. I am looking for processing on multiple groups of ibis table with one function. Can I perform parallelization in ibis in someway. Scenario
I mean I have multiple items and I want to perform below function on this items in parallelization way. Can you suggest something @cpcloud . def modify_zero_demand_as_missing_or_legitmate(
timeseries: ibis_table,
) -> ibis_table:
# compute mean and stdddev without zero and NaNs. (NaNs are by default not included in numpy mean and std method.)
timeseries = timeseries.order_by("WEEK").mutate(
INDEX=ibis.row_number().over(order_by="WEEK")
)
non_zero_mean = (
timeseries.filter(timeseries.SALES_QTY != 0).SALES_QTY.mean().execute()
)
non_zero_std = (
timeseries.filter(timeseries.SALES_QTY != 0).SALES_QTY.std().execute()
)
consider_zero_as_missing = 0 < (non_zero_mean - 3 * non_zero_std)
if consider_zero_as_missing:
first_sale_index = timeseries.filter(
timeseries.SALES_QTY != 0
).INDEX.min()
timeseries = timeseries.mutate(
SALES_QTY=ibis.case()
.when(
(timeseries.INDEX >= first_sale_index)
& (timeseries.SALES_QTY == 0),
None,
)
.else_(timeseries.SALES_QTY)
.end()
)
return timeseries Thank you |
duckdb does not support scalar User Defined Functions written in Python (to be applied one record at a time) but it does expose a vector Python UDF via the
map
method method:The main motivation for this vector Python UDF API is probably to hide the per-record Python function call overhead. I think it's a pragmatic API and it would allow to efficiently deploy trained machine learning models for batch scoring in out-of-core manner for instance.
Any chance to expose such vector Python UDFs via the Ibis API?
Also if some backends include or add support Python UDAFs (especially with in parallel via combiners in addtion to mappers and reducers), this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis. As far as I know, duckdb does not expose parallel Python UDAFs unfortunately.
Final side-request: for backends who only support scalar UDFs, would it be possible for Ibis to generate the SQL required to do the chunking itself and expose a vector UDF API to hide the Python function call overhead similarly to what duckdb is doing internally with map?
The text was updated successfully, but these errors were encountered: