Skip to content

feat: Support inlining any small data instead of uploading #1584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ArrayValue:

@classmethod
def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
# TODO: we might need to adapt some unruly types, and even cast after in bigquery
adapted_table = local_data.adapt_pa_table(arrow_table)
schema = local_data.arrow_schema_to_bigframes(adapted_table.schema)

Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ def from_local(
*,
cache_transpose: bool = True,
) -> Block:
# Assumes caller has already converted datatypes to bigframes ones.
pd_data = data
column_labels = pd_data.columns
index_labels = list(pd_data.index.names)
Expand All @@ -187,6 +186,7 @@ def from_local(

pd_data = pd_data.set_axis(column_ids, axis=1)
pd_data = pd_data.reset_index(names=index_ids)
# TODO: We need to
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
block = cls(
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/compile/explode.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def explode_unordered(
bigframes_vendored.ibis.greatest(
1, # We always want at least 1 element to fill in NULLs for empty arrays.
bigframes_vendored.ibis.least(
*[table[column_id].length() for column_id in column_ids]
*[table[column_id].length().fill_null(1) for column_id in column_ids]
),
),
1,
Expand Down
46 changes: 8 additions & 38 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,13 +794,12 @@ def _read_pandas(
)

if write_engine == "default":
try:
inline_df = self._read_pandas_inline(pandas_dataframe)
return inline_df
except ValueError:
pass
return self._read_pandas_load_job(pandas_dataframe, api_name)
elif write_engine == "bigquery_inline":
is_df_large = (
pandas_dataframe.memory_usage(deep=True).sum() > MAX_INLINE_DF_BYTES
)
write_engine = "bigquery_load" if is_df_large else "bigquery_inline"

if write_engine == "bigquery_inline":
return self._read_pandas_inline(pandas_dataframe)
elif write_engine == "bigquery_load":
return self._read_pandas_load_job(pandas_dataframe, api_name)
Expand All @@ -814,37 +813,8 @@ def _read_pandas_inline(
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

memory_usage = pandas_dataframe.memory_usage(deep=True).sum()
if memory_usage > MAX_INLINE_DF_BYTES:
raise ValueError(
f"DataFrame size ({memory_usage} bytes) exceeds the maximum allowed "
f"for inline data ({MAX_INLINE_DF_BYTES} bytes)."
)

try:
local_block = blocks.Block.from_local(pandas_dataframe, self)
inline_df = dataframe.DataFrame(local_block)
except (
pa.ArrowInvalid, # Thrown by arrow for unsupported types, such as geo.
pa.ArrowTypeError, # Thrown by arrow for types without mapping (geo).
ValueError, # Thrown by ibis for some unhandled types
TypeError, # Not all types handleable by local code path
) as exc:
raise ValueError(
f"Could not convert with a BigQuery type: `{exc}`. "
) from exc

# Make sure all types are inlinable to avoid escaping errors.
inline_types = inline_df._block.expr.schema.dtypes
noninlinable_types = [
dtype for dtype in inline_types if dtype not in INLINABLE_DTYPES
]
if len(noninlinable_types) != 0:
raise ValueError(
f"Could not inline with a BigQuery type: `{noninlinable_types}`. "
f"{constants.FEEDBACK_LINK}"
)

local_block = blocks.Block.from_local(pandas_dataframe, self)
inline_df = dataframe.DataFrame(local_block)
return inline_df

def _read_pandas_load_job(
Expand Down
4 changes: 4 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5288,6 +5288,10 @@ def test_dataframe_explode(col_names, ignore_index, session):
bf_materialized = bf_result.to_pandas()
execs_post = metrics.execution_count

print("bigframes")
print(bf_materialized)
print("pandas")
print(pd_result)
pd.testing.assert_frame_equal(
bf_materialized,
pd_result,
Expand Down
27 changes: 1 addition & 26 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import google.cloud.bigquery
import google.cloud.bigquery.table
import pandas as pd
import pyarrow as pa
import pytest

import bigframes
Expand Down Expand Up @@ -462,32 +461,8 @@ def today(cls):
resources.create_bigquery_session()


@mock.patch("bigframes.session.MAX_INLINE_DF_BYTES", 1)
def test_read_pandas_inline_exceeds_limit_raises_error():
session = resources.create_bigquery_session()
pd_df = pd.DataFrame([[1, 2, 3], [4, 5, 6]])
with pytest.raises(
ValueError,
match=r"DataFrame size \(.* bytes\) exceeds the maximum allowed for inline data \(1 bytes\)\.",
):
session.read_pandas(pd_df, write_engine="bigquery_inline")


def test_read_pandas_inline_w_interval_type_raises_error():
session = resources.create_bigquery_session()
df = pd.DataFrame(pd.arrays.IntervalArray.from_breaks([0, 10, 20, 30, 40, 50]))
with pytest.raises(ValueError, match="Could not convert with a BigQuery type: "):
with pytest.raises(TypeError):
session.read_pandas(df, write_engine="bigquery_inline")


def test_read_pandas_inline_w_noninlineable_type_raises_error():
session = resources.create_bigquery_session()
data = [
[1, 2, 3],
[4, 5],
None,
[6, 7, 8, 9],
]
s = pd.Series(data, dtype=pd.ArrowDtype(pa.list_(pa.int64())))
with pytest.raises(ValueError, match="Could not inline with a BigQuery type:"):
session.read_pandas(s, write_engine="bigquery_inline")
Original file line number Diff line number Diff line change
Expand Up @@ -763,15 +763,17 @@ def visit_DefaultLiteral(self, op, *, value, dtype):
elif dtype.is_date():
return self.f.datefromparts(value.year, value.month, value.day)
elif dtype.is_array():
# array type is ambiguous if no elements
value_type = dtype.value_type
return self.f.array(
values = self.f.array(
*(
self.visit_Literal(
ops.Literal(v, value_type), value=v, dtype=value_type
)
for v in value
)
)
return values if len(value) > 0 else self.cast(values, dtype)
elif dtype.is_map():
key_type = dtype.key_type
keys = self.f.array(
Expand Down Expand Up @@ -804,6 +806,8 @@ def visit_DefaultLiteral(self, op, *, value, dtype):
return sge.Struct.from_arg_list(items)
elif dtype.is_uuid():
return self.cast(str(value), dtype)
elif dtype.is_json():
return sge.ParseJSON(this=sge.convert(str(value)))
elif dtype.is_geospatial():
args = [value.wkt]
if (srid := dtype.srid) is not None:
Expand Down