Skip to content

feat: support routines with ARRAY return type in read_gbq_function #1412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,19 @@ def ibis_array_output_type_from_python_type(t: type) -> ibis_dtypes.DataType:
return python_type_to_ibis_type(t)


def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType:
def ibis_type_from_bigquery_type(
type_: bigquery.StandardSqlDataType,
) -> ibis_dtypes.DataType:
"""Convert bq type to ibis. Only to be used for remote functions, does not handle all types."""
if tk not in bigframes.dtypes.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS:
if type_.type_kind not in bigframes.dtypes.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS:
raise UnsupportedTypeError(
tk, bigframes.dtypes.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS
type_.type_kind, bigframes.dtypes.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS
)
elif type_.type_kind == "ARRAY":
return ibis_dtypes.Array(
value_type=ibis_type_from_bigquery_type(
typing.cast(bigquery.StandardSqlDataType, type_.array_element_type)
)
)
return third_party_ibis_bqtypes.BigQueryType.to_ibis(tk)
else:
return third_party_ibis_bqtypes.BigQueryType.to_ibis(type_.type_kind)
9 changes: 6 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4088,9 +4088,12 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)
result_series.name = None

# if the output is an array, reconstruct it from the json serialized
# string form
if bigframes.dtypes.is_array_like(func.output_dtype):
# If the result type is string but the function output is intended
# to be an array, reconstruct the array from the string assuming it
# is a json serialized form of the array.
if bigframes.dtypes.is_string_like(
result_series.dtype
) and bigframes.dtypes.is_array_like(func.output_dtype):
import bigframes.bigquery as bbq

result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
Expand Down
1 change: 1 addition & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,4 +874,5 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
"INT64",
"INTEGER",
"STRING",
"ARRAY",
}
6 changes: 6 additions & 0 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def try_delattr(attr):
try_delattr("bigframes_remote_function")
try_delattr("input_dtypes")
try_delattr("output_dtype")
try_delattr("bigframes_bigquery_function_output_dtype")
try_delattr("is_row_processor")
try_delattr("ibis_node")

Expand Down Expand Up @@ -589,6 +590,11 @@ def try_delattr(attr):
ibis_signature.output_type
)
)
func.bigframes_bigquery_function_output_dtype = (
bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype(
ibis_output_type_for_bqrf
)
)
func.is_row_processor = is_row_processor
func.ibis_node = node

Expand Down
12 changes: 8 additions & 4 deletions bigframes/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ class ReturnTypeMissingError(ValueError):
# TODO: Move this to compile folder
def ibis_signature_from_routine(routine: bigquery.Routine) -> _utils.IbisSignature:
if routine.return_type:
ibis_output_type = bigframes.core.compile.ibis_types.ibis_type_from_type_kind(
routine.return_type.type_kind
ibis_output_type = (
bigframes.core.compile.ibis_types.ibis_type_from_bigquery_type(
routine.return_type
)
)
else:
raise ReturnTypeMissingError
Expand All @@ -82,8 +84,8 @@ def ibis_signature_from_routine(routine: bigquery.Routine) -> _utils.IbisSignatu
return _utils.IbisSignature(
parameter_names=[arg.name for arg in routine.arguments],
input_types=[
bigframes.core.compile.ibis_types.ibis_type_from_type_kind(
arg.data_type.type_kind
bigframes.core.compile.ibis_types.ibis_type_from_bigquery_type(
arg.data_type
)
if arg.data_type
else None
Expand Down Expand Up @@ -233,6 +235,8 @@ def func(*bigframes_args, **bigframes_kwargs):
else ibis_signature.output_type
)

func.bigframes_bigquery_function_output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype(ibis_signature.output_type) # type: ignore

func.is_row_processor = is_row_processor # type: ignore
func.ibis_node = node # type: ignore
return func
40 changes: 9 additions & 31 deletions bigframes/operations/remote_function_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import dataclasses
import typing

from bigframes import dtypes
from bigframes.operations import base_ops


Expand All @@ -31,17 +30,10 @@ def expensive(self) -> bool:

def output_type(self, *input_types):
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
if hasattr(self.func, "output_dtype"):
if dtypes.is_array_like(self.func.output_dtype):
# TODO(b/284515241): remove this special handling to support
# array output types once BQ remote functions support ARRAY.
# Until then, use json serialized strings at the remote function
# level, and parse that to the intended output type at the
# bigframes level.
return dtypes.STRING_DTYPE
return self.func.output_dtype
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
return self.func.bigframes_bigquery_function_output_dtype
else:
raise AttributeError("output_dtype not defined")
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")


@dataclasses.dataclass(frozen=True)
Expand All @@ -55,17 +47,10 @@ def expensive(self) -> bool:

def output_type(self, *input_types):
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
if hasattr(self.func, "output_dtype"):
if dtypes.is_array_like(self.func.output_dtype):
# TODO(b/284515241): remove this special handling to support
# array output types once BQ remote functions support ARRAY.
# Until then, use json serialized strings at the remote function
# level, and parse that to the intended output type at the
# bigframes level.
return dtypes.STRING_DTYPE
return self.func.output_dtype
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
return self.func.bigframes_bigquery_function_output_dtype
else:
raise AttributeError("output_dtype not defined")
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")


@dataclasses.dataclass(frozen=True)
Expand All @@ -79,14 +64,7 @@ def expensive(self) -> bool:

def output_type(self, *input_types):
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
if hasattr(self.func, "output_dtype"):
if dtypes.is_array_like(self.func.output_dtype):
# TODO(b/284515241): remove this special handling to support
# array output types once BQ remote functions support ARRAY.
# Until then, use json serialized strings at the remote function
# level, and parse that to the intended output type at the
# bigframes level.
return dtypes.STRING_DTYPE
return self.func.output_dtype
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
return self.func.bigframes_bigquery_function_output_dtype
else:
raise AttributeError("output_dtype not defined")
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
18 changes: 12 additions & 6 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,9 +1545,12 @@ def apply(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
)

# if the output is an array, reconstruct it from the json serialized
# string form
if bigframes.dtypes.is_array_like(func.output_dtype):
# If the result type is string but the function output is intended to
# be an array, reconstruct the array from the string assuming it is a
# json serialized form of the array.
if bigframes.dtypes.is_string_like(
result_series.dtype
) and bigframes.dtypes.is_array_like(func.output_dtype):
import bigframes.bigquery as bbq

result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
Expand Down Expand Up @@ -1585,9 +1588,12 @@ def combine(
other, ops.BinaryRemoteFunctionOp(func=func)
)

# if the output is an array, reconstruct it from the json serialized
# string form
if bigframes.dtypes.is_array_like(func.output_dtype):
# If the result type is string but the function output is intended to
# be an array, reconstruct the array from the string assuming it is a
# json serialized form of the array.
if bigframes.dtypes.is_string_like(
result_series.dtype
) and bigframes.dtypes.is_array_like(func.output_dtype):
import bigframes.bigquery as bbq

result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
Expand Down
5 changes: 5 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ def table_id_unique(dataset_id: str):
return f"{dataset_id}.{prefixer.create_prefix()}"


@pytest.fixture(scope="function")
def routine_id_unique(dataset_id: str):
return f"{dataset_id}.{prefixer.create_prefix()}"


@pytest.fixture(scope="session")
def scalars_schema(bigquery_client: bigquery.Client):
# TODO(swast): Add missing scalar data types such as BIGNUMERIC.
Expand Down
4 changes: 4 additions & 0 deletions tests/system/large/functions/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,10 @@ def foo(x, y, z):
)
)
)
assert (
getattr(foo, "bigframes_bigquery_function_output_dtype")
== bigframes.dtypes.STRING_DTYPE
)

# Fails to apply on dataframe with incompatible number of columns
with pytest.raises(
Expand Down
Loading