Skip to content

feat: support STRUCT data type with Series.struct.field to extract child fields #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 3, 2023
3 changes: 3 additions & 0 deletions .kokoro/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ cd "${PROJECT_ROOT}"
# Disable buffering, so that the logs stream through.
export PYTHONUNBUFFERED=1

# Workaround https://github.com/pytest-dev/pytest/issues/9567
export PY_IGNORE_IMPORTMISMATCH=1

# Debug: show build environment
env | grep KOKORO

Expand Down
10 changes: 9 additions & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,15 @@ def __init__(
columns=columns, # type:ignore
dtype=dtype, # type:ignore
)
if pd_dataframe.size < MAX_INLINE_DF_SIZE:
if (
pd_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pandas.ArrowDtype)
)
):
self._block = blocks.block_from_local(
pd_dataframe, session or bigframes.pandas.get_global_session()
)
Expand Down
66 changes: 62 additions & 4 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@

BIDIRECTIONAL_MAPPINGS: Iterable[Tuple[IbisDtype, Dtype]] = (
(ibis_dtypes.boolean, pd.BooleanDtype()),
(ibis_dtypes.date, pd.ArrowDtype(pa.date32())),
(ibis_dtypes.float64, pd.Float64Dtype()),
(ibis_dtypes.int64, pd.Int64Dtype()),
(ibis_dtypes.string, pd.StringDtype(storage="pyarrow")),
(ibis_dtypes.date, pd.ArrowDtype(pa.date32())),
(ibis_dtypes.time, pd.ArrowDtype(pa.time64("us"))),
(ibis_dtypes.Timestamp(timezone=None), pd.ArrowDtype(pa.timestamp("us"))),
(
Expand All @@ -100,6 +100,19 @@
pandas: ibis for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}

IBIS_TO_ARROW: Dict[ibis_dtypes.DataType, pa.DataType] = {
ibis_dtypes.boolean: pa.bool_(),
ibis_dtypes.date: pa.date32(),
ibis_dtypes.float64: pa.float64(),
ibis_dtypes.int64: pa.int64(),
ibis_dtypes.string: pa.string(),
ibis_dtypes.time: pa.time64("us"),
ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"),
ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"),
}

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}

IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = {
ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}
Expand Down Expand Up @@ -148,11 +161,12 @@ def ibis_dtype_to_bigframes_dtype(
# Special cases: Ibis supports variations on these types, but currently
# our IO returns them as objects. Eventually, we should support them as
# ArrowDType (and update the IO accordingly)
if isinstance(ibis_dtype, ibis_dtypes.Array) or isinstance(
ibis_dtype, ibis_dtypes.Struct
):
if isinstance(ibis_dtype, ibis_dtypes.Array):
return np.dtype("O")

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
elif isinstance(ibis_dtype, ibis_dtypes.Null):
Expand All @@ -164,6 +178,29 @@ def ibis_dtype_to_bigframes_dtype(
)


def ibis_dtype_to_arrow_dtype(ibis_dtype: ibis_dtypes.DataType) -> pa.DataType:
if isinstance(ibis_dtype, ibis_dtypes.Array):
return pa.list_(ibis_dtype_to_arrow_dtype(ibis_dtype.value_type))

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pa.struct(
[
(name, ibis_dtype_to_arrow_dtype(dtype))
for name, dtype in ibis_dtype.fields.items()
]
)

if ibis_dtype in IBIS_TO_ARROW:
return IBIS_TO_ARROW[ibis_dtype]
elif isinstance(ibis_dtype, ibis_dtypes.Null):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know how we might hit this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will, honestly. Can remove for now.

# Fallback to STRING for NULL values for most flexibility in SQL.
return IBIS_TO_ARROW[ibis_dtypes.string]
else:
raise ValueError(
f"Unexpected Ibis data type {ibis_dtype}. {constants.FEEDBACK_LINK}"
)


def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
"""Converts an Ibis expression to canonical type.

Expand All @@ -187,6 +224,24 @@ def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:
return table.select(*casted_columns)


def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType:
if pa.types.is_struct(arrow_dtype):
struct_dtype = typing.cast(pa.StructType, arrow_dtype)
return ibis_dtypes.Struct.from_tuples(
[
(field.name, arrow_dtype_to_ibis_dtype(field.type))
for field in struct_dtype
]
)

if arrow_dtype in ARROW_TO_IBIS:
return ARROW_TO_IBIS[arrow_dtype]
else:
raise ValueError(
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
)


def bigframes_dtype_to_ibis_dtype(
bigframes_dtype: Union[DtypeString, Dtype, np.dtype[Any]]
) -> ibis_dtypes.DataType:
Expand All @@ -202,6 +257,9 @@ def bigframes_dtype_to_ibis_dtype(
Raises:
ValueError: If passed a dtype not supported by BigQuery DataFrames.
"""
if isinstance(bigframes_dtype, pd.ArrowDtype):
return arrow_dtype_to_ibis_dtype(bigframes_dtype.pyarrow_dtype)

type_string = str(bigframes_dtype)
if type_string in BIGFRAMES_STRING_TO_BIGFRAMES:
bigframes_dtype = BIGFRAMES_STRING_TO_BIGFRAMES[
Expand Down
10 changes: 9 additions & 1 deletion bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ def __init__(
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if pd_dataframe.size < MAX_INLINE_SERIES_SIZE:
if (
pd_dataframe.size < MAX_INLINE_SERIES_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pd.ArrowDtype)
)
):
self._block = blocks.block_from_local(
pd_dataframe, session or bigframes.pandas.get_global_session()
)
Expand Down
61 changes: 61 additions & 0 deletions bigframes/operations/structs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

import ibis.expr.types as ibis_types

import bigframes.dataframe
import bigframes.operations
import bigframes.operations.base
import bigframes.series
import third_party.bigframes_vendored.pandas.core.arrays.arrow.accessors as vendoracessors


class StructField(bigframes.operations.UnaryOp):
def __init__(self, name_or_index: str | int):
self._name_or_index = name_or_index

def _as_ibis(self, x: ibis_types.Value):
struct_value = typing.cast(ibis_types.StructValue, x)
if isinstance(self._name_or_index, str):
name = self._name_or_index
else:
name = struct_value.names[self._name_or_index]
return struct_value[name].name(name)


class StructAccessor(
bigframes.operations.base.SeriesMethods, vendoracessors.StructAccessor
):
__doc__ = vendoracessors.StructAccessor.__doc__

def field(self, name_or_index: str | int) -> bigframes.series.Series:
series = self._apply_unary_op(StructField(name_or_index))
if isinstance(name_or_index, str):
name = name_or_index
else:
struct_field = self._dtype.pyarrow_dtype[name_or_index]
name = struct_field.name
return series.rename(name)

def explode(self) -> bigframes.dataframe.DataFrame:
import bigframes.pandas

pa_type = self._dtype.pyarrow_dtype
return bigframes.pandas.concat(
[self.field(i) for i in range(pa_type.num_fields)], axis="columns"
)
5 changes: 5 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import bigframes.operations.base
import bigframes.operations.datetimes as dt
import bigframes.operations.strings as strings
import bigframes.operations.structs as structs
import third_party.bigframes_vendored.pandas.core.series as vendored_pandas_series

LevelType = typing.Union[str, int]
Expand Down Expand Up @@ -118,6 +119,10 @@ def query_job(self) -> Optional[bigquery.QueryJob]:
self._set_internal_query_job(self._compute_dry_run())
return self._query_job

@property
def struct(self) -> structs.StructAccessor:
return structs.StructAccessor(self._block)

def _set_internal_query_job(self, query_job: bigquery.QueryJob):
self._query_job = query_job

Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def doctest(session: nox.sessions.Session):
run_system(
session=session,
prefix_name="doctest",
extra_pytest_options=("--doctest-modules",),
extra_pytest_options=("--doctest-modules", "third_party"),
test_folder="bigframes",
check_cov=True,
)
Expand Down
14 changes: 13 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,19 @@ def test_get_dtypes_array_struct(session):
dtypes = df.dtypes
pd.testing.assert_series_equal(
dtypes,
pd.Series({"array_column": np.dtype("O"), "struct_column": np.dtype("O")}),
pd.Series(
{
"array_column": np.dtype("O"),
"struct_column": pd.ArrowDtype(
pa.struct(
[
("string_field", pa.string()),
("float_field", pa.float64()),
]
)
),
}
),
)


Expand Down
64 changes: 64 additions & 0 deletions tests/unit/test_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,70 @@ def test_ibis_float32_raises_unexpected_datatype():
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_dtypes.float32)


IBIS_ARROW_DTYPES = (
(ibis_dtypes.boolean, pa.bool_()),
(ibis_dtypes.date, pa.date32()),
(ibis_dtypes.Timestamp(), pa.timestamp("us")),
(ibis_dtypes.float64, pa.float64()),
(
ibis_dtypes.Timestamp(timezone="UTC"),
pa.timestamp("us", tz="UTC"),
),
(
ibis_dtypes.Struct.from_tuples(
[
("name", ibis_dtypes.string()),
("version", ibis_dtypes.int64()),
]
),
pa.struct(
[
("name", pa.string()),
("version", pa.int64()),
]
),
),
(
ibis_dtypes.Struct.from_tuples(
[
(
"nested",
ibis_dtypes.Struct.from_tuples(
[
("field", ibis_dtypes.string()),
]
),
),
]
),
pa.struct(
[
(
"nested",
pa.struct(
[
("field", pa.string()),
]
),
),
]
),
),
)


@pytest.mark.parametrize(("ibis_dtype", "arrow_dtype"), IBIS_ARROW_DTYPES)
def test_arrow_dtype_to_ibis_dtype(ibis_dtype, arrow_dtype):
result = bigframes.dtypes.arrow_dtype_to_ibis_dtype(arrow_dtype)
assert result == ibis_dtype


@pytest.mark.parametrize(("ibis_dtype", "arrow_dtype"), IBIS_ARROW_DTYPES)
def test_ibis_dtype_to_arrow_dtype(ibis_dtype, arrow_dtype):
result = bigframes.dtypes.ibis_dtype_to_arrow_dtype(ibis_dtype)
assert result == arrow_dtype


@pytest.mark.parametrize(
["bigframes_dtype", "ibis_dtype"],
[
Expand Down
Empty file.
Empty file.
Loading