Skip to content

feat(api): Vectorized UDAFs #4707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ogrisel opened this issue Oct 23, 2022 · 14 comments
Closed

feat(api): Vectorized UDAFs #4707

ogrisel opened this issue Oct 23, 2022 · 14 comments
Labels
feature Features or general enhancements

Comments

@ogrisel
Copy link
Contributor

ogrisel commented Oct 23, 2022

duckdb does not support scalar User Defined Functions written in Python (to be applied one record at a time) but it does expose a vector Python UDF via the map method method:

>>> import pandas as pd
>>> import duckdb
>>> df = pd.DataFrame({"x": range(int(1e4))})
>>> def process_chunk(df_chunk):
...     print(f"processing chunk of size {df_chunk.shape[0]}")
...     return df_chunk * 2
... 
>>> duckdb.from_df(df).map(process_chunk).to_df()
processing chunk of size 0
processing chunk of size 0
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 784
          x
0         0
1         2
2         4
3         6
4         8
...     ...
9995  19990
9996  19992
9997  19994
9998  19996
9999  19998

[10000 rows x 1 columns]

The main motivation for this vector Python UDF API is probably to hide the per-record Python function call overhead. I think it's a pragmatic API and it would allow to efficiently deploy trained machine learning models for batch scoring in out-of-core manner for instance.

Any chance to expose such vector Python UDFs via the Ibis API?

Also if some backends include or add support Python UDAFs (especially with in parallel via combiners in addtion to mappers and reducers), this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis. As far as I know, duckdb does not expose parallel Python UDAFs unfortunately.

Final side-request: for backends who only support scalar UDFs, would it be possible for Ibis to generate the SQL required to do the chunking itself and expose a vector UDF API to hide the Python function call overhead similarly to what duckdb is doing internally with map?

@cpcloud
Copy link
Member

cpcloud commented Oct 23, 2022

@ogrisel Thanks for the issue. This is definitely on our radar and we'll probably start experimenting with support for this in the next month. In fact, the DuckDB folks just pointed us to .map a few weeks ago.

Any chance to expose such vector Python UDFs via the Ibis API?

So, in short, yes there's a great chance of this happening :)

this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis

This is an interesting path for us to go down; it's great to hear a concrete use case for UDAFs since getting them to work well with a nice API and solid performance will be challenging.

for backends who only support scalar UDFs, would it be possible for Ibis to generate the SQL required to do the chunking itself and expose a vector UDF API to hide the Python function call overhead similarly to what duckdb is doing internally with map?

Possibly! I think we'll need to do some prototyping before we can give a concrete yes or no to this.

Really appreciate all the issues you're opening, it's wonderful to get feedback from users ❤️

@cpcloud cpcloud changed the title Feature request: Vector Python UDFs (and UDAFs) feat Vector Python UDFs (and UDAFs) Oct 23, 2022
@cpcloud cpcloud changed the title feat Vector Python UDFs (and UDAFs) feat(api): Vector Python UDFs (and UDAFs) Oct 23, 2022
@cpcloud cpcloud added the feature Features or general enhancements label Oct 23, 2022
@ogrisel
Copy link
Contributor Author

ogrisel commented Oct 23, 2022

this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis

This is an interesting path for us to go down; it's great to hear a concrete use case for UDAFs since getting them to work well with a nice API and solid performance will be challenging.

Note that UDF (without A) support is enough to "apply" a trained model to a large dataset to score (compute the predictions / recommendations / price estimates / anomaly scores...) in an out-of-core manner and possibly distributed manner (if the backend is distributed) assuming the the UDF workers are long-lived and can reuse a cached model loaded by name from a shared filesystem or object store to process several chunks without re-paying the FS I/O and model loading overhead for each chunk.

This alone would be quite valuable I think. Off-course, distributed model training with UDAFs would also be interesting but it would require a lot more work (and internal changes in scikit-learn in particular).

@ogrisel
Copy link
Contributor Author

ogrisel commented Oct 23, 2022

Also: related question, does the dask backend support (vector) UDAFs? It should be quite natural to implement however the documentation on UDFs is lacking.

@ogrisel
Copy link
Contributor Author

ogrisel commented Oct 23, 2022

Reading the clickhouse docs, it seems that it would be natural to implement Python vector UDFs with clickhouse as records are batched by default:

https://clickhouse.com/docs/en/sql-reference/functions/#executable-user-defined-functions

Both clickhouse and pandas can efficiently exchange data via the Arrow format (chunked dataframes): https://clickhouse.com/docs/en/interfaces/formats/#data-format-arrow so implemented a duckdb map that presents the chunk data as a pandas dataframe should be quite easy to do for this backend.

@ogrisel
Copy link
Contributor Author

ogrisel commented Oct 28, 2022

@cpcloud I drafted a proposal for Python UDAFs API in a duckdb issue if you are interested: duckdb/duckdb#5117.

@jreback
Copy link
Contributor

jreback commented Oct 28, 2022

cc @icexelloss

@cpcloud
Copy link
Member

cpcloud commented Jul 5, 2023

We now have vectorized scalar UDFs in the DuckDB backend. Check out the related blog post.

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 21, 2023

We now have vectorized scalar UDFs in the DuckDB backend. Check out the related blog post.

Thanks, this is great. A few remarks:

  • I cannot find those in the reference documentation. I had to read the source code to list the various UDF alternatives (Python scalar values, Pandas Series and PyArrow Arrays);
  • While I could successfully write a test function that would process more than 1 input columns at a time, I do not see how it would be possible to use this to write a function that generates more than 1 return values and interpret them as new individual columns with user controllable names. If it's possible to do so with the existing API it would be great to showcase this in an example snippet, e.g. in the docstrings of the UDFs;
  • Is it possible to control the chunking? At the moment is completely implicit. Maybe some users would like to be able to be given some control on the memory-usage / per-Python call overhead tradeoff.

@cpcloud
Copy link
Member

cpcloud commented Aug 21, 2023

I cannot find those in the reference documentation.

Yeah, this is something we need to add. I'll create an issue to track this work.

While I could successfully write a test function that would process more than 1 input columns at a time, I do not see how it would be possible to use this to write a function that generates more than 1 return values and interpret them as new individual columns with user controllable names.

That's a good question.

Here's an extremely contrived but hopefully illustrative example of how you might do this:

In [25]: from ibis.interactive import *

In [26]: import ibis.expr.datatypes as dt

In [27]: @udf.scalar.python
    ...: def make_struct(
    ...:     height: int, mass: float
    ...: ) -> dt.Struct(dict(height_cm="int", mass_kg="float")):
    ...:     return dict(height_cm=height, mass_kg=mass)

In [28]: t = ex.starwars.fetch(table_name="starwars")

In [29]: t.select(dims=make_struct(t.height, t.mass)).unpack("dims")
Out[29]:
┏━━━━━━━━━━━┳━━━━━━━━━┓
┃ height_cm ┃ mass_kg ┃
┡━━━━━━━━━━━╇━━━━━━━━━┩
│ int64     │ float64 │
├───────────┼─────────┤
│       172 │    77.0 │
│       167 │    75.0 │
│        96 │    32.0 │
│       202 │   136.0 │
│       150 │    49.0 │
│       178 │   120.0 │
│       165 │    75.0 │
│        97 │    32.0 │
│       183 │    84.0 │
│       182 │    77.0 │
│         … │       … │
└───────────┴─────────┘

In [30]: import pyarrow.compute as pc
    ...:
    ...:
    ...: @udf.scalar.pyarrow
    ...: def make_struct_arrow(
    ...:     height: int, mass: float
    ...: ) -> dt.Struct(dict(height_cm="int", mass_kg="float")):
    ...:     return pc.make_struct(height, mass, field_names=("height_cm", "mass_kg"))

In [31]: t.select(dims=make_struct_arrow(t.height, t.mass)).unpack("dims")
Out[31]:
┏━━━━━━━━━━━┳━━━━━━━━━┓
┃ height_cm ┃ mass_kg ┃
┡━━━━━━━━━━━╇━━━━━━━━━┩
│ int64     │ float64 │
├───────────┼─────────┤
│       172 │    77.0 │
│       167 │    75.0 │
│        96 │    32.0 │
│       202 │   136.0 │
│       150 │    49.0 │
│       178 │   120.0 │
│       165 │    75.0 │
│        97 │    32.0 │
│       183 │    84.0 │
│       182 │    77.0 │
│         … │       … │
└───────────┴─────────┘

The approach is basically to stuff everything into a struct and then unpack the columns into the containing table as needed.

The primary downside is that column names must be declared statically. You can't use the result of the UDF to compute the column names.

Is it possible to control the chunking?

AFAIK, no.

At what point do you want to control the chunking? When you call to_pandas(), or earlier, such as when you define the function?

What would your ideal API look like for controlling the chunking?

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 22, 2023

At what point do you want to control the chunking? When you call to_pandas(), or earlier, such as when you define the function?

Not sure what you mean by "when you call to_pandas()" (when calling the function itself?).
I think it should be fine to do it do it at function time.

What would your ideal API look like for controlling the chunking?

Maybe as an argument to the decorator, to either control the target number of records per chunk or the average size in Bytes for each each.

But honestly this might be a YAGNI for most people.

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 22, 2023

The primary downside is that column names must be declared statically. You can't use the result of the UDF to compute the column names.

That's fine. Since the type declaration, the knowledge of pyarrow.compute.make_struct and the combination with the unpack method are not trivial, I think it would be helpful to demonstrate this more advanced case in an example doctest in the docstring.

@cpcloud
Copy link
Member

cpcloud commented Jun 25, 2024

Vectorized (per-element) UDFs have been around for a while, retitling this to reflect that.

@cpcloud cpcloud changed the title feat(api): Vector Python UDFs (and UDAFs) feat(api): Vectorized UDAFs Jun 25, 2024
@cpcloud
Copy link
Member

cpcloud commented Sep 11, 2024

Closing due to lack of activity.

@cpcloud cpcloud closed this as not planned Won't fix, can't repro, duplicate, stale Sep 11, 2024
@github-project-automation github-project-automation bot moved this from backlog to done in Ibis planning and roadmap Sep 11, 2024
@Hg03
Copy link

Hg03 commented May 25, 2025

Hi @cpcloud , Hopefully I will get reponse for this. I am looking for processing on multiple groups of ibis table with one function. Can I perform parallelization in ibis in someway.

Scenario

Item Week Sales_Qty
item1 ... .....
item2 ... .....
item1 ... .....

I mean I have multiple items and I want to perform below function on this items in parallelization way. Can you suggest something @cpcloud .

def modify_zero_demand_as_missing_or_legitmate(
    timeseries: ibis_table,
) -> ibis_table:
    # compute mean and stdddev without zero and NaNs. (NaNs are by default not included in numpy mean and std method.)
    timeseries = timeseries.order_by("WEEK").mutate(
        INDEX=ibis.row_number().over(order_by="WEEK")
    )
    non_zero_mean = (
        timeseries.filter(timeseries.SALES_QTY != 0).SALES_QTY.mean().execute()
    )
    non_zero_std = (
        timeseries.filter(timeseries.SALES_QTY != 0).SALES_QTY.std().execute()
    )
    consider_zero_as_missing = 0 < (non_zero_mean - 3 * non_zero_std)
    if consider_zero_as_missing:
        first_sale_index = timeseries.filter(
            timeseries.SALES_QTY != 0
        ).INDEX.min()
        timeseries = timeseries.mutate(
            SALES_QTY=ibis.case()
            .when(
                (timeseries.INDEX >= first_sale_index)
                & (timeseries.SALES_QTY == 0),
                None,
            )
            .else_(timeseries.SALES_QTY)
            .end()
        )
    return timeseries

Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements
Projects
Archived in project
Development

No branches or pull requests

4 participants