From 3fca092253898eacfa6529e3c802f3fcd49dca24 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Mon, 9 Dec 2024 16:23:38 -0600 Subject: [PATCH 01/12] feat: add `write_engine` parameter to `read_FORMATNAME` methods to control how data is written to BigQuery --- bigframes/dtypes.py | 15 +- bigframes/pandas/io/api.py | 39 ++++- bigframes/session/__init__.py | 158 ++++++++++++++---- bigframes/session/_io/pandas.py | 51 +++++- bigframes/session/loader.py | 109 ++++++++---- bigframes/session/validation.py | 36 ++++ setup.py | 7 +- testing/constraints-3.9.txt | 3 +- tests/system/small/test_session.py | 12 +- tests/unit/core/test_dtypes.py | 2 +- tests/unit/session/test_session.py | 23 +++ third_party/bigframes_vendored/constants.py | 20 +++ .../pandas/io/parsers/readers.py | 24 +++ .../bigframes_vendored/pandas/io/pickle.py | 12 ++ 14 files changed, 432 insertions(+), 79 deletions(-) create mode 100644 bigframes/session/validation.py diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index bc5b89b779..99de5febed 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -343,12 +343,19 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: return pd.ArrowDtype(arrow_dtype) if pa.types.is_struct(arrow_dtype): return pd.ArrowDtype(arrow_dtype) + + # BigFrames doesn't distinguish between string and large_string because the + # largest string (2 GB) is already larger than the largest BigQuery row. + if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype): + return STRING_DTYPE + if arrow_dtype == pa.null(): return DEFAULT_DTYPE - else: - raise ValueError( - f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}" - ) + + # No other types matched. + raise ValueError( + f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}" + ) _BIGFRAMES_TO_ARROW = { diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 4e08b3ef5e..4ad6f8485c 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -30,6 +30,7 @@ Union, ) +import bigframes_vendored.constants as constants import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq from google.cloud import bigquery import numpy @@ -105,6 +106,7 @@ def read_csv( Literal["c", "python", "pyarrow", "python-fwf", "bigquery"] ] = None, encoding: Optional[str] = None, + write_engine: constants.WriteEngineType = "default", **kwargs, ) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( @@ -118,6 +120,7 @@ def read_csv( dtype=dtype, engine=engine, encoding=encoding, + write_engine=write_engine, **kwargs, ) @@ -135,6 +138,7 @@ def read_json( encoding: Optional[str] = None, lines: bool = False, engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson", + write_engine: constants.WriteEngineType = "default", **kwargs, ) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( @@ -145,6 +149,7 @@ def read_json( encoding=encoding, lines=lines, engine=engine, + write_engine=write_engine, **kwargs, ) @@ -245,24 +250,41 @@ def read_gbq_table( @typing.overload -def read_pandas(pandas_dataframe: pandas.DataFrame) -> bigframes.dataframe.DataFrame: +def read_pandas( + pandas_dataframe: pandas.DataFrame, + *, + write_engine: constants.WriteEngineType = "default", +) -> bigframes.dataframe.DataFrame: ... @typing.overload -def read_pandas(pandas_dataframe: pandas.Series) -> bigframes.series.Series: +def read_pandas( + pandas_dataframe: pandas.Series, + *, + write_engine: constants.WriteEngineType = "default", +) -> bigframes.series.Series: ... @typing.overload -def read_pandas(pandas_dataframe: pandas.Index) -> bigframes.core.indexes.Index: +def read_pandas( + pandas_dataframe: pandas.Index, + *, + write_engine: constants.WriteEngineType = "default", +) -> bigframes.core.indexes.Index: ... -def read_pandas(pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index]): +def read_pandas( + pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index], + *, + write_engine: constants.WriteEngineType = "default", +): return global_session.with_default_session( bigframes.session.Session.read_pandas, pandas_dataframe, + write_engine=write_engine, ) @@ -273,12 +295,15 @@ def read_pickle( filepath_or_buffer: FilePath | ReadPickleBuffer, compression: CompressionOptions = "infer", storage_options: StorageOptions = None, + *, + write_engine: constants.WriteEngineType = "default", ): return global_session.with_default_session( bigframes.session.Session.read_pickle, filepath_or_buffer=filepath_or_buffer, compression=compression, storage_options=storage_options, + write_engine=write_engine, ) @@ -286,12 +311,16 @@ def read_pickle( def read_parquet( - path: str | IO["bytes"], *, engine: str = "auto" + path: str | IO["bytes"], + *, + engine: str = "auto", + write_engine: constants.WriteEngineType = "default", ) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( bigframes.session.Session.read_parquet, path, engine=engine, + write_engine=write_engine, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 27707c21fd..20ace90bda 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -79,6 +79,7 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temp_storage +import bigframes.session.validation import bigframes.version # Avoid circular imports. @@ -607,20 +608,36 @@ def read_gbq_model(self, model_name: str): @typing.overload def read_pandas( - self, pandas_dataframe: pandas.Index + self, + pandas_dataframe: pandas.Index, + *, + write_engine: constants.WriteEngineType = "default", ) -> bigframes.core.indexes.Index: ... @typing.overload - def read_pandas(self, pandas_dataframe: pandas.Series) -> bigframes.series.Series: + def read_pandas( + self, + pandas_dataframe: pandas.Series, + *, + write_engine: constants.WriteEngineType = "default", + ) -> bigframes.series.Series: ... @typing.overload - def read_pandas(self, pandas_dataframe: pandas.DataFrame) -> dataframe.DataFrame: + def read_pandas( + self, + pandas_dataframe: pandas.DataFrame, + *, + write_engine: constants.WriteEngineType = "default", + ) -> dataframe.DataFrame: ... def read_pandas( - self, pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index] + self, + pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index], + *, + write_engine: constants.WriteEngineType = "default", ): """Loads DataFrame from a pandas DataFrame. @@ -651,6 +668,16 @@ def read_pandas( Args: pandas_dataframe (pandas.DataFrame, pandas.Series, or pandas.Index): a pandas DataFrame/Series/Index object to be loaded. + write_engine (str): + How data should be written to BigQuery (if at all). Supported + values: + + * "default": + Select either "bigquery_inline" or "bigquery_load", + depending on data size. + * "bigquery_inline": Inline data in BigQuery SQL. + * "bigquery_load": Use a BigQuery load job. + * "bigquery_streaming": Use the BigQuery streaming JSON API. Returns: An equivalent bigframes.pandas.(DataFrame/Series/Index) object @@ -659,24 +686,35 @@ def read_pandas( # Try to handle non-dataframe pandas objects as well if isinstance(pandas_dataframe, pandas.Series): - bf_df = self._read_pandas(pandas.DataFrame(pandas_dataframe), "read_pandas") + bf_df = self._read_pandas( + pandas.DataFrame(pandas_dataframe), + "read_pandas", + write_engine=write_engine, + ) bf_series = series.Series(bf_df._block) # wrapping into df can set name to 0 so reset to original object name bf_series.name = pandas_dataframe.name return bf_series if isinstance(pandas_dataframe, pandas.Index): return self._read_pandas( - pandas.DataFrame(index=pandas_dataframe), "read_pandas" + pandas.DataFrame(index=pandas_dataframe), + "read_pandas, write_engine=write_engine", ).index if isinstance(pandas_dataframe, pandas.DataFrame): - return self._read_pandas(pandas_dataframe, "read_pandas") + return self._read_pandas( + pandas_dataframe, "read_pandas", write_engine=write_engine + ) else: raise ValueError( f"read_pandas() expects a pandas.DataFrame, but got a {type(pandas_dataframe)}" ) def _read_pandas( - self, pandas_dataframe: pandas.DataFrame, api_name: str + self, + pandas_dataframe: pandas.DataFrame, + api_name: str, + *, + write_engine: constants.WriteEngineType = "default", ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe @@ -686,18 +724,23 @@ def _read_pandas( "bigframes.pandas.DataFrame." ) - inline_df = self._read_pandas_inline(pandas_dataframe) - if inline_df is not None: - return inline_df - try: - return self._loader.read_pandas_load_job(pandas_dataframe, api_name) - except pa.ArrowInvalid as e: - raise pa.ArrowInvalid( - f"Could not convert with a BigQuery type: `{e}`. " - ) from e + if write_engine == "default": + inline_df = self._read_pandas_inline(pandas_dataframe, should_raise=False) + if inline_df is not None: + return inline_df + return self._read_pandas_load_job(pandas_dataframe, api_name) + elif write_engine == "bigquery_inline": + # Regarding the type: ignore, with should_raise=True, this should never return None. + return self._read_pandas_inline(pandas_dataframe, should_raise=True) # type: ignore + elif write_engine == "bigquery_load": + return self._read_pandas_load_job(pandas_dataframe, api_name) + elif write_engine == "bigquery_streaming": + return self._read_pandas_streaming(pandas_dataframe) + else: + raise ValueError(f"Got unexpected write_engine '{write_engine}'") def _read_pandas_inline( - self, pandas_dataframe: pandas.DataFrame + self, pandas_dataframe: pandas.DataFrame, should_raise=False ) -> Optional[dataframe.DataFrame]: import bigframes.dataframe as dataframe @@ -707,18 +750,50 @@ def _read_pandas_inline( try: local_block = blocks.Block.from_local(pandas_dataframe, self) inline_df = dataframe.DataFrame(local_block) - except pa.ArrowInvalid: # Thrown by arrow for unsupported types, such as geo. - return None - except ValueError: # Thrown by ibis for some unhandled types - return None - except pa.ArrowTypeError: # Thrown by arrow for types without mapping (geo). - return None + except ( + pa.ArrowInvalid, # Thrown by arrow for unsupported types, such as geo. + pa.ArrowTypeError, # Thrown by arrow for types without mapping (geo). + ValueError, # Thrown by ibis for some unhandled types + ) as exc: + if should_raise: + raise ValueError( + f"Could not convert with a BigQuery type: `{exc}`. " + ) from exc + else: + return None inline_types = inline_df._block.expr.schema.dtypes # Ibis has problems escaping bytes literals, which will cause syntax errors server-side. - if all(dtype in INLINABLE_DTYPES for dtype in inline_types): + noninlinable_types = [ + dtype for dtype in inline_types if dtype not in INLINABLE_DTYPES + ] + if len(noninlinable_types) == 0: return inline_df - return None + + if should_raise: + raise ValueError( + f"Could not inline with a BigQuery type: `{noninlinable_types}`. " + ) + else: + return None + + def _read_pandas_load_job( + self, + pandas_dataframe: pandas.DataFrame, + api_name: str, + ) -> dataframe.DataFrame: + try: + return self._loader.read_pandas_load_job(pandas_dataframe, api_name) + except pa.ArrowInvalid as e: + raise pa.ArrowInvalid( + f"Could not convert with a BigQuery type: `{e}`. " + ) from e + + def _read_pandas_streaming( + self, + pandas_dataframe: pandas.DataFrame, + ) -> dataframe.DataFrame: + return self._loader.read_pandas_streaming(pandas_dataframe) def read_csv( self, @@ -754,8 +829,13 @@ def read_csv( Literal["c", "python", "pyarrow", "python-fwf", "bigquery"] ] = None, encoding: Optional[str] = None, + write_engine: constants.WriteEngineType = "default", **kwargs, ) -> dataframe.DataFrame: + bigframes.session.validation.validate_engine_compatibility( + engine=engine, + write_engine=write_engine, + ) table = self._temp_storage_manager._random_table() if engine is not None and engine == "bigquery": @@ -866,13 +946,15 @@ def read_csv( encoding=encoding, **kwargs, ) - return self._read_pandas(pandas_df, "read_csv") # type: ignore + return self._read_pandas(pandas_df, api_name="read_csv", write_engine=write_engine) # type: ignore def read_pickle( self, filepath_or_buffer: FilePath | ReadPickleBuffer, compression: CompressionOptions = "infer", storage_options: StorageOptions = None, + *, + write_engine: constants.WriteEngineType = "default", ): pandas_obj = pandas.read_pickle( filepath_or_buffer, @@ -885,14 +967,21 @@ def read_pickle( pandas_obj.name = "0" bigframes_df = self._read_pandas(pandas_obj.to_frame(), "read_pickle") return bigframes_df[bigframes_df.columns[0]] - return self._read_pandas(pandas_obj, "read_pickle") + return self._read_pandas( + pandas_obj, api_name="read_pickle", write_engine=write_engine + ) def read_parquet( self, path: str | IO["bytes"], *, engine: str = "auto", + write_engine: constants.WriteEngineType = "default", ) -> dataframe.DataFrame: + bigframes.session.validation.validate_engine_compatibility( + engine=engine, + write_engine=write_engine, + ) table = self._temp_storage_manager._random_table() if engine == "bigquery": @@ -917,7 +1006,9 @@ def read_parquet( engine=engine, # type: ignore **read_parquet_kwargs, ) - return self._read_pandas(pandas_obj, "read_parquet") + return self._read_pandas( + pandas_obj, api_name="read_parquet", write_engine=write_engine + ) def read_json( self, @@ -930,8 +1021,13 @@ def read_json( encoding: Optional[str] = None, lines: bool = False, engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson", + write_engine: constants.WriteEngineType = "default", **kwargs, ) -> dataframe.DataFrame: + bigframes.session.validation.validate_engine_compatibility( + engine=engine, + write_engine=write_engine, + ) table = self._temp_storage_manager._random_table() if engine == "bigquery": @@ -998,7 +1094,9 @@ def read_json( engine=engine, **kwargs, ) - return self._read_pandas(pandas_df, "read_json") + return self._read_pandas( + pandas_df, api_name="read_json", write_engine=write_engine + ) def _check_file_size(self, filepath: str): max_size = 1024 * 1024 * 1024 # 1 GB in bytes diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 6ceaab6915..a1f768128e 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -13,10 +13,12 @@ # limitations under the License. from __future__ import annotations -from typing import Union +import dataclasses +from typing import Collection, Union import bigframes_vendored.constants as constants import geopandas # type: ignore +import numpy as np import pandas import pandas.arrays import pyarrow # type: ignore @@ -24,9 +26,18 @@ import pyarrow.types # type: ignore import bigframes.core.schema +import bigframes.core.utils as utils import bigframes.features +@dataclasses.dataclass(frozen=True) +class DataFrameAndLabels: + df: pandas.DataFrame + column_labels: Collection + index_labels: Collection + ordering_col: str + + def _arrow_to_pandas_arrowdtype( column: pyarrow.Array, dtype: pandas.ArrowDtype ) -> pandas.Series: @@ -110,3 +121,41 @@ def arrow_to_pandas( serieses[field.name] = series return pandas.DataFrame(serieses) + + +def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndLabels: + """Convert a pandas DataFrame into something compatible with uploading to a + BigQuery table (without flexible column names enabled). + """ + col_index = pandas_dataframe.columns.copy() + col_labels, idx_labels = ( + col_index.to_list(), + pandas_dataframe.index.names, + ) + new_col_ids, new_idx_ids = utils.get_standardized_ids( + col_labels, + idx_labels, + # Loading parquet files into BigQuery with special column names + # is only supported under an allowlist. + strict=True, + ) + + # Add order column to pandas DataFrame to preserve order in BigQuery + ordering_col = "rowid" + columns = frozenset(col_labels + idx_labels) + suffix = 2 + while ordering_col in columns: + ordering_col = f"rowid_{suffix}" + suffix += 1 + + pandas_dataframe_copy = pandas_dataframe.copy() + pandas_dataframe_copy.index.names = new_idx_ids + pandas_dataframe_copy.columns = pandas.Index(new_col_ids) + pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) + + return DataFrameAndLabels( + df=pandas_dataframe_copy, + column_labels=col_labels, + index_labels=idx_labels, + ordering_col=ordering_col, + ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 21d454d72f..6299d6bcd6 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -33,27 +33,26 @@ import google.cloud.functions_v2 import google.cloud.resourcemanager_v3 import jellyfish -import numpy as np import pandas +import pandas_gbq.schema.pandas_to_bigquery import bigframes.clients import bigframes.constants import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.compile +import bigframes.core.expression as expression import bigframes.core.guid import bigframes.core.pruning import bigframes.core.schema as schemata -import bigframes.core.utils as utils - -# Even though the ibis.backends.bigquery import is unused, it's needed -# to register new and replacement ops with the Ibis BigQuery backend. import bigframes.dataframe import bigframes.dtypes import bigframes.exceptions import bigframes.formatting_helpers as formatting_helpers +import bigframes.operations.aggregations as agg_ops import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table +import bigframes.session._io.pandas as bf_io_pandas import bigframes.session.clients import bigframes.session.executor import bigframes.session.metrics @@ -134,31 +133,11 @@ def read_pandas_load_job( ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe - col_index = pandas_dataframe.columns.copy() - col_labels, idx_labels = ( - col_index.to_list(), - pandas_dataframe.index.names, - ) - new_col_ids, new_idx_ids = utils.get_standardized_ids( - col_labels, - idx_labels, - # Loading parquet files into BigQuery with special column names - # is only supported under an allowlist. - strict=True, - ) - - # Add order column to pandas DataFrame to preserve order in BigQuery - ordering_col = "rowid" - columns = frozenset(col_labels + idx_labels) - suffix = 2 - while ordering_col in columns: - ordering_col = f"rowid_{suffix}" - suffix += 1 - - pandas_dataframe_copy = pandas_dataframe.copy() - pandas_dataframe_copy.index.names = new_idx_ids - pandas_dataframe_copy.columns = pandas.Index(new_col_ids) - pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) + df_and_labels = bf_io_pandas.pandas_to_bq_compatible(pandas_dataframe) + pandas_dataframe_copy = df_and_labels.df + new_idx_ids = pandas_dataframe_copy.index.names + new_col_ids = pandas_dataframe_copy.columns + ordering_col = df_and_labels.ordering_col job_config = bigquery.LoadJobConfig() # Specify the datetime dtypes, which is auto-detected as timestamp types. @@ -196,8 +175,74 @@ def read_pandas_load_job( block = blocks.Block( array_value, index_columns=new_idx_ids, - column_labels=col_index, - index_labels=idx_labels, + column_labels=df_and_labels.column_labels, + index_labels=df_and_labels.index_labels, + ) + return dataframe.DataFrame(block) + + def read_pandas_streaming( + self, + pandas_dataframe: pandas.DataFrame, + ) -> dataframe.DataFrame: + """Same as pandas_to_bigquery_load, but uses the BQ legacy streaming API.""" + import bigframes.dataframe as dataframe + + df_and_labels = bf_io_pandas.pandas_to_bq_compatible(pandas_dataframe) + pandas_dataframe_copy = df_and_labels.df + new_idx_ids = pandas_dataframe_copy.index.names + ordering_col = df_and_labels.ordering_col + schema = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( + pandas_dataframe_copy, + index=True, + ) + + destination = self._storage_manager.create_temp_table( + schema, + [ordering_col], + ) + destination_table = bigquery.Table(destination, schema=schema) + breakpoint() + # TODO(swast): Confirm that the index is written. + for errors in self._bqclient.insert_rows_from_dataframe( + destination_table, + pandas_dataframe_copy, + ): + if errors: + raise ValueError( + f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" + ) + + array_value = ( + core.ArrayValue.from_table( + table=destination_table, + schema=schemata.ArraySchema.from_bq_table(destination_table), + session=self._session, + # Don't set the offsets column because we want to group by it. + ) + # There may be duplicate rows because of hidden retries, so use a query to + # deduplicate based on the ordering ID, which is guaranteed to be unique. + # We know that rows with same ordering ID are duplicates, + # so ANY_VALUE() is deterministic. + .aggregate( + by_column_ids=[ordering_col], + aggregations=[ + ( + expression.UnaryAggregation( + agg_ops.AnyValueOp(), + expression.deref(field.name), + ), + field.name, + ) + for field in destination_table.schema + if field.name != ordering_col + ], + ).drop_columns([ordering_col]) + ) + block = blocks.Block( + array_value, + index_columns=new_idx_ids, + column_labels=df_and_labels.column_labels, + index_labels=df_and_labels.index_labels, ) return dataframe.DataFrame(block) diff --git a/bigframes/session/validation.py b/bigframes/session/validation.py new file mode 100644 index 0000000000..7c0f5cc92c --- /dev/null +++ b/bigframes/session/validation.py @@ -0,0 +1,36 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bigframes_vendored.constants + + +def validate_engine_compatibility(engine, write_engine): + """Raises NotImplementedError if engine is not compatible with write_engine.""" + + if engine == "bigquery" and write_engine in ( + "bigquery_inline", + "bigquery_streaming", + ): + raise NotImplementedError( + bigframes_vendored.constants.WRITE_ENGINE_ERROR_TEMPLATE.format( + engine=engine, write_engine=write_engine + ) + ) + + if engine != "bigquery" and write_engine in ("bigquery_external_table",): + raise NotImplementedError( + bigframes_vendored.constants.WRITE_ENGINE_ERROR_TEMPLATE.format( + engine=engine, write_engine=write_engine + ) + ) diff --git a/setup.py b/setup.py index 833d4fe565..f4665cda16 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ "google-auth >=2.15.0,<3.0dev", "google-cloud-bigtable >=2.24.0", "google-cloud-pubsub >=2.21.4", - "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", + "google-cloud-bigquery[bqstorage,pandas] >=3.18.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", "google-cloud-iam >=2.12.1", @@ -51,6 +51,7 @@ "jellyfish >=0.8.9", "numpy >=1.24.0", "pandas >=1.5.3", + "pandas-gbq >=0.24.0", "pyarrow >=10.0.1", "pydata-google-auth >=1.8.2", "requests >=2.27.1", @@ -67,9 +68,7 @@ ] extras = { # Optional test dependencies packages. If they're missed, may skip some tests. - "tests": [ - "pandas-gbq >=0.19.0", - ], + "tests": [], # Packages required for basic development flow. "dev": ["pytest", "pytest-mock", "pre-commit", "nox", "google-cloud-testutils"], } diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 4a9d1ae281..0e7cf966bc 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -6,7 +6,7 @@ geopandas==0.12.2 google-auth==2.15.0 google-cloud-bigtable==2.24.0 google-cloud-pubsub==2.21.4 -google-cloud-bigquery==3.16.0 +google-cloud-bigquery==3.18.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 google-cloud-iam==2.12.1 @@ -16,6 +16,7 @@ ibis-framework==9.0.0 jellyfish==0.8.9 numpy==1.24.0 pandas==1.5.3 +pandas-gbq==0.24.0 pyarrow==10.0.1 pydata-google-auth==1.8.2 requests==2.27.1 diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 4b48915d2d..5010b6fac6 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -685,7 +685,16 @@ def test_read_pandas_tokyo( @utils.skip_legacy_pandas -def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder): +@pytest.mark.parametrize( + ("write_engine",), + ( + ("default",), + ("bigquery_inline",), + ("bigquery_load",), + ("bigquery_streaming",), + ), +) +def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder, write_engine): scalars_df, _ = scalars_dfs path = gcs_folder + "test_read_csv_gcs_default_engine_w_index*.csv" read_path = utils.get_first_file_from_wildcard(path) @@ -696,6 +705,7 @@ def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder): read_path, # Convert default pandas dtypes to match BigQuery DataFrames dtypes. dtype=dtype, + write_engine=write_engine, ) # TODO(chelsealin): If we serialize the index, can more easily compare values. diff --git a/tests/unit/core/test_dtypes.py b/tests/unit/core/test_dtypes.py index ae194be83f..e25528d22b 100644 --- a/tests/unit/core/test_dtypes.py +++ b/tests/unit/core/test_dtypes.py @@ -74,7 +74,7 @@ def test_ibis_dtype_converts(ibis_dtype, bigframes_dtype): def test_ibis_timestamp_pst_raises_unexpected_datatype(): """BigQuery timestamp only supports UTC time""" - with pytest.raises(ValueError, match="Unexpected Ibis data type"): + with pytest.raises(ValueError, match="'PST'"): bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( ibis_dtypes.Timestamp(timezone="PST") ) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index b76c74654c..b9112603bd 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -161,6 +161,29 @@ def test_read_csv_pandas_engines_index_col_sequential_int64_not_supported(engine ) +@pytest.mark.parametrize( + ("engine", "write_engine"), + ( + ("bigquery", "bigquery_streaming"), + ("bigquery", "bigquery_inline"), + ), +) +def test_read_csv_with_incompatible_write_engine(engine, write_engine): + session = resources.create_bigquery_session() + + with pytest.raises( + NotImplementedError, + match=re.escape( + f"write_engine={repr(write_engine)} is incompatible with engine={repr(engine)}" + ), + ): + session.read_csv( + "gs://cloud-samples-data/bigquery/us-states/us-states.csv", + engine=engine, + write_engine=write_engine, + ) + + @pytest.mark.parametrize("missing_parts_table_id", [(""), ("table")]) def test_read_gbq_missing_parts(missing_parts_table_id): session = resources.create_bigquery_session() diff --git a/third_party/bigframes_vendored/constants.py b/third_party/bigframes_vendored/constants.py index 1effdffcbe..b7e0c36aee 100644 --- a/third_party/bigframes_vendored/constants.py +++ b/third_party/bigframes_vendored/constants.py @@ -16,6 +16,9 @@ This module should not depend on any others in the package. """ + +from typing import Literal + import bigframes_vendored.version FEEDBACK_LINK = ( @@ -29,3 +32,20 @@ "Please share this stacktrace and how you reached it with the BigQuery DataFrames team. " f"{FEEDBACK_LINK}" ) + +WRITE_ENGINE_ERROR_TEMPLATE = ( + "write_engine='{write_engine}' is incompatible with engine='{engine}'. " + f"{FEEDBACK_LINK}" +) + +# TODO(swast): Use unpack operator to avoid redundancy when Python 3.11 is +# the minimum version. +VALID_WRITE_ENGINES = [ + "default", + "bigquery_inline", + "bigquery_load", + "bigquery_streaming", +] +WriteEngineType = Literal[ + "default", "bigquery_inline", "bigquery_load", "bigquery_streaming" +] diff --git a/third_party/bigframes_vendored/pandas/io/parsers/readers.py b/third_party/bigframes_vendored/pandas/io/parsers/readers.py index 35b2a1982a..e44ed3b130 100644 --- a/third_party/bigframes_vendored/pandas/io/parsers/readers.py +++ b/third_party/bigframes_vendored/pandas/io/parsers/readers.py @@ -49,6 +49,7 @@ def read_csv( Literal["c", "python", "pyarrow", "python-fwf", "bigquery"] ] = None, encoding: Optional[str] = None, + write_engine="default", **kwargs, ): """Loads data from a comma-separated values (csv) file into a DataFrame. @@ -142,6 +143,17 @@ def read_csv( documentation for a comprehensive list, https://docs.python.org/3/library/codecs.html#standard-encodings The BigQuery engine only supports `UTF-8` and `ISO-8859-1`. + write_engine (str): + How data should be written to BigQuery (if at all). Supported + values: + + * "default": + Select either "bigquery_inline" or "bigquery_load", + depending on data size. + * "bigquery_inline": Inline data in BigQuery SQL. + * "bigquery_load": Use a BigQuery load job. + * "bigquery_streaming": Use the BigQuery streaming JSON API. + **kwargs: keyword arguments for `pandas.read_csv` when not using the BigQuery engine. @@ -162,6 +174,7 @@ def read_json( encoding: Optional[str] = None, lines: bool = False, engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson", + write_engine="default", **kwargs, ): """ @@ -222,6 +235,17 @@ def read_json( engine ({{"ujson", "pyarrow", "bigquery"}}, default "ujson"): Type of engine to use. If `engine="bigquery"` is specified, then BigQuery's load API will be used. Otherwise, the engine will be passed to `pandas.read_json`. + write_engine (str): + How data should be written to BigQuery (if at all). Supported + values: + + * "default": + Select either "bigquery_inline" or "bigquery_load", + depending on data size. + * "bigquery_inline": Inline data in BigQuery SQL. + * "bigquery_load": Use a BigQuery load job. + * "bigquery_streaming": Use the BigQuery streaming JSON API. + **kwargs: keyword arguments for `pandas.read_json` when not using the BigQuery engine. diff --git a/third_party/bigframes_vendored/pandas/io/pickle.py b/third_party/bigframes_vendored/pandas/io/pickle.py index 88684309f9..75bd0882a5 100644 --- a/third_party/bigframes_vendored/pandas/io/pickle.py +++ b/third_party/bigframes_vendored/pandas/io/pickle.py @@ -18,6 +18,8 @@ def read_pickle( filepath_or_buffer: FilePath | ReadPickleBuffer, compression: CompressionOptions = "infer", storage_options: StorageOptions = None, + *, + write_engine="default", ): """Load pickled BigFrames object (or any object) from file. @@ -62,6 +64,16 @@ def read_pickle( starting with “s3://”, and “gcs://”) the key-value pairs are forwarded to fsspec.open. Please see fsspec and urllib for more details, and for more examples on storage options refer here. + write_engine (str): + How data should be written to BigQuery (if at all). Supported + values: + + * "default": + Select either "bigquery_inline" or "bigquery_load", + depending on data size. + * "bigquery_inline": Inline data in BigQuery SQL. + * "bigquery_load": Use a BigQuery load job. + * "bigquery_streaming": Use the BigQuery streaming JSON API. Returns: bigframes.dataframe.DataFrame or bigframes.series.Series: same type as object From 5234abf295eee5e4454387ce156c3e736fed96c2 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 10 Dec 2024 13:15:53 -0600 Subject: [PATCH 02/12] set constraints to allow unit tests to pass --- testing/constraints-3.13.txt | 0 testing/constraints-3.14.txt | 0 testing/constraints-3.15.txt | 0 testing/constraints-3.9.txt | 2 -- 4 files changed, 2 deletions(-) create mode 100644 testing/constraints-3.13.txt create mode 100644 testing/constraints-3.14.txt create mode 100644 testing/constraints-3.15.txt diff --git a/testing/constraints-3.13.txt b/testing/constraints-3.13.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/constraints-3.14.txt b/testing/constraints-3.14.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/constraints-3.15.txt b/testing/constraints-3.15.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 49f6f900df..daa747e28c 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -26,5 +26,3 @@ tabulate==0.9 ipywidgets==7.7.1 humanize==4.6.0 matplotlib==3.7.1 -# extras -pandas-gbq==0.19.0 From 279eeb50953205de39a58c34d5cd60eaed085a7a Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 10 Dec 2024 13:31:48 -0600 Subject: [PATCH 03/12] allow binary columns to be inlined --- bigframes/session/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index d5e59a0f17..bee02d5a67 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -106,12 +106,13 @@ logger = logging.getLogger(__name__) -# Excludes geography, bytes, and nested (array, struct) datatypes +# Excludes geography and nested (array, struct) datatypes INLINABLE_DTYPES: Sequence[bigframes.dtypes.Dtype] = ( pandas.BooleanDtype(), pandas.Float64Dtype(), pandas.Int64Dtype(), pandas.StringDtype(storage="pyarrow"), + pandas.ArrowDtype(pa.binary()), pandas.ArrowDtype(pa.date32()), pandas.ArrowDtype(pa.time64("us")), pandas.ArrowDtype(pa.timestamp("us")), @@ -803,7 +804,8 @@ def _read_pandas_inline( return None inline_types = inline_df._block.expr.schema.dtypes - # Ibis has problems escaping bytes literals, which will cause syntax errors server-side. + + # Make sure all types are inlinable to avoid escaping errors. noninlinable_types = [ dtype for dtype in inline_types if dtype not in INLINABLE_DTYPES ] @@ -813,6 +815,7 @@ def _read_pandas_inline( if should_raise: raise ValueError( f"Could not inline with a BigQuery type: `{noninlinable_types}`. " + f"{constants.FEEDBACK_LINK}" ) else: return None From 12047248782025d2e9d7719693c40be7f12cc66e Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Wed, 11 Dec 2024 09:58:22 -0600 Subject: [PATCH 04/12] bump minimum pandas-gbq to 0.25.0 --- noxfile.py | 4 ---- setup.py | 2 +- testing/constraints-3.9.txt | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/noxfile.py b/noxfile.py index 341de704e5..680bf243a6 100644 --- a/noxfile.py +++ b/noxfile.py @@ -621,10 +621,6 @@ def prerelease(session: nox.sessions.Session, tests_path, extra_pytest_options=( ) already_installed.add("google-cloud-bigquery-storage") - # Workaround to install pandas-gbq >=0.15.0, which is required by test only. - session.install("--no-deps", "pandas-gbq") - already_installed.add("pandas-gbq") - session.install( *set(UNIT_TEST_STANDARD_DEPENDENCIES + SYSTEM_TEST_STANDARD_DEPENDENCIES), "-c", diff --git a/setup.py b/setup.py index 1eab4bb92a..12ffa10a42 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ "jellyfish >=0.8.9,<1.1.2", "numpy >=1.24.0", "pandas >=1.5.3", - "pandas-gbq >=0.24.0", + "pandas-gbq >=0.25.0", "pyarrow >=10.0.1", "pydata-google-auth >=1.8.2", "requests >=2.27.1", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index daa747e28c..de509dcc7e 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -15,7 +15,7 @@ google-cloud-storage==2.0.0 jellyfish==0.8.9 numpy==1.24.0 pandas==1.5.3 -pandas-gbq==0.24.0 +pandas-gbq==0.25.0 pyarrow==10.0.1 pydata-google-auth==1.8.2 requests==2.27.1 From 72c09a0767ff08c520288732c19b53fe3edd8fb4 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Wed, 11 Dec 2024 13:49:52 -0600 Subject: [PATCH 05/12] use pandas-gbq schema detection in load jobs too --- bigframes/session/loader.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 333444ba1b..45bf192742 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -139,20 +139,20 @@ def read_pandas_load_job( df_and_labels = bf_io_pandas.pandas_to_bq_compatible(pandas_dataframe) pandas_dataframe_copy = df_and_labels.df new_idx_ids = pandas_dataframe_copy.index.names - new_col_ids = pandas_dataframe_copy.columns ordering_col = df_and_labels.ordering_col + schema: list[ + bigquery.SchemaField + ] = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( + pandas_dataframe_copy, + index=True, + ) job_config = bigquery.LoadJobConfig() - # Specify the datetime dtypes, which is auto-detected as timestamp types. - schema: list[bigquery.SchemaField] = [] - for column, dtype in zip(new_col_ids, pandas_dataframe.dtypes): - if dtype == "timestamp[us][pyarrow]": - schema.append( - bigquery.SchemaField(column, bigquery.enums.SqlTypeNames.DATETIME) - ) job_config.schema = schema - # Clustering probably not needed anyways as pandas tables are small + # TODO: Remove this. It's likely that the slower load job due to + # clustering doesn't improve speed of queries because pandas tables are + # small. cluster_cols = [ordering_col] job_config.clustering_fields = cluster_cols @@ -194,7 +194,9 @@ def read_pandas_streaming( pandas_dataframe_copy = df_and_labels.df new_idx_ids = pandas_dataframe_copy.index.names ordering_col = df_and_labels.ordering_col - schema = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( + schema: list[ + bigquery.SchemaField + ] = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( pandas_dataframe_copy, index=True, ) From 5a037056733a50112a5b898ea1d56b7e6bcb536e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Wed, 11 Dec 2024 13:51:53 -0600 Subject: [PATCH 06/12] Update bigframes/session/__init__.py --- bigframes/session/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index bee02d5a67..1241c8be42 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -739,7 +739,8 @@ def read_pandas( if isinstance(pandas_dataframe, pandas.Index): return self._read_pandas( pandas.DataFrame(index=pandas_dataframe), - "read_pandas, write_engine=write_engine", + "read_pandas", + write_engine=write_engine, ).index if isinstance(pandas_dataframe, pandas.DataFrame): return self._read_pandas( From 43d674457a3d703b78f48639ae5112f1c6d11543 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Wed, 11 Dec 2024 14:15:41 -0600 Subject: [PATCH 07/12] fix mypy --- bigframes/session/loader.py | 2 +- tests/unit/polars_session.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 45bf192742..d6578ad28b 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -34,7 +34,7 @@ import google.cloud.resourcemanager_v3 import jellyfish import pandas -import pandas_gbq.schema.pandas_to_bigquery +import pandas_gbq.schema.pandas_to_bigquery # type: ignore import bigframes.clients import bigframes.constants diff --git a/tests/unit/polars_session.py b/tests/unit/polars_session.py index 76705ddd74..dfb1f5bfa6 100644 --- a/tests/unit/polars_session.py +++ b/tests/unit/polars_session.py @@ -87,7 +87,7 @@ def __init__(self): self._executor = TestExecutor() self._loader = None # type: ignore - def read_pandas(self, pandas_dataframe): + def read_pandas(self, pandas_dataframe, write_engine="default"): # override read_pandas to always keep data local-only local_block = bigframes.core.blocks.Block.from_local(pandas_dataframe, self) return bigframes.dataframe.DataFrame(local_block) From 6aacbfee3043306a1c37265f37da64db3eb0ba97 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 12 Dec 2024 11:31:06 -0600 Subject: [PATCH 08/12] fix `tests/system/small/test_session.py::test_read_pandas_w_unsupported_mixed_dtype` test --- bigframes/session/__init__.py | 8 ++++---- bigframes/session/loader.py | 9 +++++++++ tests/system/small/test_session.py | 3 +-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index bd7b228af2..fd4fe59ab2 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -828,10 +828,10 @@ def _read_pandas_load_job( ) -> dataframe.DataFrame: try: return self._loader.read_pandas_load_job(pandas_dataframe, api_name) - except pa.ArrowInvalid as e: - raise pa.ArrowInvalid( - f"Could not convert with a BigQuery type: `{e}`. " - ) from e + except (pa.ArrowInvalid, pa.ArrowTypeError) as exc: + raise ValueError( + f"Could not convert with a BigQuery type: `{exc}`." + ) from exc def _read_pandas_streaming( self, diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index d6578ad28b..e7579b1138 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -140,6 +140,11 @@ def read_pandas_load_job( pandas_dataframe_copy = df_and_labels.df new_idx_ids = pandas_dataframe_copy.index.names ordering_col = df_and_labels.ordering_col + + # TODO(https://github.com/googleapis/python-bigquery-pandas/issues/760): + # Once pandas-gbq can show a link to the running load job like + # bigframes does, switch to using pandas-gbq to load the + # bigquery-compatible pandas DataFrame. schema: list[ bigquery.SchemaField ] = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( @@ -194,6 +199,10 @@ def read_pandas_streaming( pandas_dataframe_copy = df_and_labels.df new_idx_ids = pandas_dataframe_copy.index.names ordering_col = df_and_labels.ordering_col + + # TODO(https://github.com/googleapis/python-bigquery-pandas/issues/300): + # Once pandas-gbq can do streaming inserts (again), switch to using + # pandas-gbq to write the bigquery-compatible pandas DataFrame. schema: list[ bigquery.SchemaField ] = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 03ec7b55a0..eb63a9b72a 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -26,7 +26,6 @@ import google.cloud.bigquery as bigquery import numpy as np import pandas as pd -import pyarrow as pa import pytest import bigframes @@ -624,7 +623,7 @@ def test_read_pandas_index(session): def test_read_pandas_w_unsupported_mixed_dtype(session): - with pytest.raises(pa.ArrowInvalid, match="Could not convert"): + with pytest.raises(ValueError, match="Could not convert"): session.read_pandas(pd.DataFrame({"a": [1, "hello"]})) From 5f3153cf38026f5ad6a22bea2881b45ed207a264 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 12 Dec 2024 11:40:28 -0600 Subject: [PATCH 09/12] add advice for which write_engine to use --- bigframes/session/__init__.py | 18 ++++++++++---- .../pandas/io/parsers/readers.py | 24 +++++-------------- .../bigframes_vendored/pandas/io/pickle.py | 12 +++------- 3 files changed, 22 insertions(+), 32 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index fd4fe59ab2..2c83491e79 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -710,11 +710,19 @@ def read_pandas( values: * "default": - Select either "bigquery_inline" or "bigquery_load", - depending on data size. - * "bigquery_inline": Inline data in BigQuery SQL. - * "bigquery_load": Use a BigQuery load job. - * "bigquery_streaming": Use the BigQuery streaming JSON API. + (Recommended) Select an appropriate mechanism to write data + to BigQuery. Depends on data size and supported data types. + * "bigquery_inline": + Inline data in BigQuery SQL. Use this when you know the data + is small enough to fit within BigQuery's 1 MB query text size + limit. + * "bigquery_load": + Use a BigQuery load job. Use this for larger data sizes. + * "bigquery_streaming": + Use the BigQuery streaming JSON API. Use this if your + workload is such that you exhaust the BigQuery load job + quota and your data cannot be embedded in SQL due to size or + data type limitations. Returns: An equivalent bigframes.pandas.(DataFrame/Series/Index) object diff --git a/third_party/bigframes_vendored/pandas/io/parsers/readers.py b/third_party/bigframes_vendored/pandas/io/parsers/readers.py index 93af94d5ad..3ad810fa2e 100644 --- a/third_party/bigframes_vendored/pandas/io/parsers/readers.py +++ b/third_party/bigframes_vendored/pandas/io/parsers/readers.py @@ -144,15 +144,9 @@ def read_csv( https://docs.python.org/3/library/codecs.html#standard-encodings The BigQuery engine only supports `UTF-8` and `ISO-8859-1`. write_engine (str): - How data should be written to BigQuery (if at all). Supported - values: - - * "default": - Select either "bigquery_inline" or "bigquery_load", - depending on data size. - * "bigquery_inline": Inline data in BigQuery SQL. - * "bigquery_load": Use a BigQuery load job. - * "bigquery_streaming": Use the BigQuery streaming JSON API. + How data should be written to BigQuery (if at all). See + :func:`bigframes.pandas.read_pandas` for a full description of + supported values. **kwargs: keyword arguments for `pandas.read_csv` when not using the BigQuery engine. @@ -240,15 +234,9 @@ def read_json( Type of engine to use. If `engine="bigquery"` is specified, then BigQuery's load API will be used. Otherwise, the engine will be passed to `pandas.read_json`. write_engine (str): - How data should be written to BigQuery (if at all). Supported - values: - - * "default": - Select either "bigquery_inline" or "bigquery_load", - depending on data size. - * "bigquery_inline": Inline data in BigQuery SQL. - * "bigquery_load": Use a BigQuery load job. - * "bigquery_streaming": Use the BigQuery streaming JSON API. + How data should be written to BigQuery (if at all). See + :func:`bigframes.pandas.read_pandas` for a full description of + supported values. **kwargs: keyword arguments for `pandas.read_json` when not using the BigQuery engine. diff --git a/third_party/bigframes_vendored/pandas/io/pickle.py b/third_party/bigframes_vendored/pandas/io/pickle.py index 0dc3ff2b29..33088dc019 100644 --- a/third_party/bigframes_vendored/pandas/io/pickle.py +++ b/third_party/bigframes_vendored/pandas/io/pickle.py @@ -65,15 +65,9 @@ def read_pickle( fsspec.open. Please see fsspec and urllib for more details, and for more examples on storage options refer here. write_engine (str): - How data should be written to BigQuery (if at all). Supported - values: - - * "default": - Select either "bigquery_inline" or "bigquery_load", - depending on data size. - * "bigquery_inline": Inline data in BigQuery SQL. - * "bigquery_load": Use a BigQuery load job. - * "bigquery_streaming": Use the BigQuery streaming JSON API. + How data should be written to BigQuery (if at all). See + :func:`bigframes.pandas.read_pandas` for a full description of + supported values. Returns: bigframes.pandas.DataFrame or bigframes.pandas.Series: same type as object From f40527ea2a59a46fdb12b654a869b892cd7e2efb Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 12 Dec 2024 12:04:57 -0600 Subject: [PATCH 10/12] add unit tests --- bigframes/session/validation.py | 10 +++++---- tests/unit/session/test_session.py | 9 +++++++- third_party/bigframes_vendored/constants.py | 23 +++++++++++---------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/bigframes/session/validation.py b/bigframes/session/validation.py index 7c0f5cc92c..f2e6881844 100644 --- a/bigframes/session/validation.py +++ b/bigframes/session/validation.py @@ -23,14 +23,16 @@ def validate_engine_compatibility(engine, write_engine): "bigquery_streaming", ): raise NotImplementedError( - bigframes_vendored.constants.WRITE_ENGINE_ERROR_TEMPLATE.format( - engine=engine, write_engine=write_engine + bigframes_vendored.constants.WRITE_ENGINE_REQUIRES_LOCAL_ENGINE_TEMPLATE.format( + engine=repr(engine), + write_engine=repr(write_engine), ) ) if engine != "bigquery" and write_engine in ("bigquery_external_table",): raise NotImplementedError( - bigframes_vendored.constants.WRITE_ENGINE_ERROR_TEMPLATE.format( - engine=engine, write_engine=write_engine + bigframes_vendored.constants.WRITE_ENGINE_REQUIRES_BIGQUERY_ENGINE_TEMPLATE.format( + engine=repr(engine), + write_engine=repr(write_engine), ) ) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 8eb0d1abd4..210fc5d633 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -164,8 +164,15 @@ def test_read_csv_pandas_engines_index_col_sequential_int64_not_supported(engine @pytest.mark.parametrize( ("engine", "write_engine"), ( + # Can't use bigquery parsing if parsing the data locally to upload. ("bigquery", "bigquery_streaming"), ("bigquery", "bigquery_inline"), + # No local parsing engines are compatible with bigquery_external_table. + (None, "bigquery_external_table"), + ("c", "bigquery_external_table"), + ("pyarrow", "bigquery_external_table"), + ("python", "bigquery_external_table"), + ("python-fwf", "bigquery_external_table"), ), ) def test_read_csv_with_incompatible_write_engine(engine, write_engine): @@ -174,7 +181,7 @@ def test_read_csv_with_incompatible_write_engine(engine, write_engine): with pytest.raises( NotImplementedError, match=re.escape( - f"write_engine={repr(write_engine)} is incompatible with engine={repr(engine)}" + f"Can't use parsing engine={repr(engine)} with write_engine={repr(write_engine)}, which" ), ): session.read_csv( diff --git a/third_party/bigframes_vendored/constants.py b/third_party/bigframes_vendored/constants.py index b7e0c36aee..d1aaa800cc 100644 --- a/third_party/bigframes_vendored/constants.py +++ b/third_party/bigframes_vendored/constants.py @@ -17,6 +17,7 @@ This module should not depend on any others in the package. """ +import typing from typing import Literal import bigframes_vendored.version @@ -33,19 +34,19 @@ f"{FEEDBACK_LINK}" ) -WRITE_ENGINE_ERROR_TEMPLATE = ( - "write_engine='{write_engine}' is incompatible with engine='{engine}'. " - f"{FEEDBACK_LINK}" +WRITE_ENGINE_TEMPLATE = ( + "Can't use parsing engine={engine} with write_engine={write_engine}, which " +) +WRITE_ENGINE_REQUIRES_LOCAL_ENGINE_TEMPLATE = ( + WRITE_ENGINE_TEMPLATE + "requires a local parsing engine. " + FEEDBACK_LINK +) +WRITE_ENGINE_REQUIRES_BIGQUERY_ENGINE_TEMPLATE = ( + WRITE_ENGINE_TEMPLATE + + "requires the engine='bigquery' parsing engine. " + + FEEDBACK_LINK ) -# TODO(swast): Use unpack operator to avoid redundancy when Python 3.11 is -# the minimum version. -VALID_WRITE_ENGINES = [ - "default", - "bigquery_inline", - "bigquery_load", - "bigquery_streaming", -] WriteEngineType = Literal[ "default", "bigquery_inline", "bigquery_load", "bigquery_streaming" ] +VALID_WRITE_ENGINES = typing.get_args(WriteEngineType) From 4c8e43cfaaf2ecddcf0f249605191730398ee664 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 12 Dec 2024 14:13:02 -0600 Subject: [PATCH 11/12] use prerelease version of pandas-gbq --- noxfile.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/noxfile.py b/noxfile.py index e5f33bf964..9610c1287e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -620,6 +620,11 @@ def prerelease(session: nox.sessions.Session, tests_path, extra_pytest_options=( "git+https://github.com/googleapis/python-bigquery-storage.git#egg=google-cloud-bigquery-storage", ) already_installed.add("google-cloud-bigquery-storage") + session.install( + "--upgrade", + "git+https://github.com/googleapis/python-bigquery-pandas.git#egg=pandas-gbq", + ) + already_installed.add("pandas-gbq") session.install( *set(UNIT_TEST_STANDARD_DEPENDENCIES + SYSTEM_TEST_STANDARD_DEPENDENCIES), From ebcc49ca4aa8f16f8f8f7dd3c9e749cdce1c1e46 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 26 Dec 2024 12:54:56 -0600 Subject: [PATCH 12/12] update to pandas-gbq 0.26.0 --- setup.py | 2 +- testing/constraints-3.9.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 12ffa10a42..f08f53eaf7 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ "jellyfish >=0.8.9,<1.1.2", "numpy >=1.24.0", "pandas >=1.5.3", - "pandas-gbq >=0.25.0", + "pandas-gbq >=0.26.0", "pyarrow >=10.0.1", "pydata-google-auth >=1.8.2", "requests >=2.27.1", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index de509dcc7e..c0ffcfaa1c 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -15,7 +15,7 @@ google-cloud-storage==2.0.0 jellyfish==0.8.9 numpy==1.24.0 pandas==1.5.3 -pandas-gbq==0.25.0 +pandas-gbq==0.26.0 pyarrow==10.0.1 pydata-google-auth==1.8.2 requests==2.27.1