From f1d697a54ecf8059e976c954e9c55467b0eef96e Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 25 Mar 2024 23:03:50 +0000 Subject: [PATCH 1/8] feat: Support max_columns in repr and make repr more efficient --- bigframes/core/blocks.py | 48 ++++++++++++++++---------- bigframes/core/indexes/index.py | 11 +++--- bigframes/dataframe.py | 60 ++++++++++++--------------------- bigframes/series.py | 11 +++--- bigframes/session/__init__.py | 10 ++++-- 5 files changed, 71 insertions(+), 69 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index afa13375b1..e4ad02cc9d 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -464,6 +464,23 @@ def to_pandas_batches(self): self._copy_index_to_pandas(df) yield df + def download_pandas_preview( + self, max_rows: int + ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: + """Download one page of results and return the query job.""" + dtypes = dict(zip(self.index_columns, self.index.dtypes)) + dtypes.update(zip(self.value_columns, self.dtypes)) + results_iterator, query_job = self.session._execute( + self.expr, sorted=True, max_results=max_rows + ) + results_iterator = results_iterator.to_arrow_iterable() + arrow_table = next(results_iterator) + downloaded_df = bigframes.session._io.pandas.arrow_to_pandas( + arrow_table, dtypes + ) + self._copy_index_to_pandas(downloaded_df) + return downloaded_df, query_job + def _copy_index_to_pandas(self, df: pd.DataFrame): """Set the index on pandas DataFrame to match this block. @@ -1289,31 +1306,28 @@ def _forward_slice(self, start: int = 0, stop=None, step: int = 1): return block.select_columns(self.value_columns) - # Using cache to optimize for Jupyter Notebook's behavior where both '__repr__' - # and '__repr_html__' are called in a single display action, reducing redundant - # queries. @functools.cache def retrieve_repr_request_results( - self, max_results: int - ) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]: + self, max_results: int, max_columns: int + ) -> Tuple[pd.DataFrame, Tuple[int, int], bigquery.QueryJob, Block]: """ Retrieves a pandas dataframe containing only max_results many rows for use with printing methods. - Returns a tuple of the dataframe and the overall number of rows of the query. + Returns a tuple of the dataframe preview for printing and the overall number + of rows and columns of the table, as well as the query job used and the cached + block generated. """ - # TODO(swast): Select a subset of columns if max_columns is less than the - # number of columns in the schema. - count = self.shape[0] - if count > max_results: - head_block = self.slice(0, max_results) - else: - head_block = self - computed_df, query_job = head_block.to_pandas() - formatted_df = computed_df.set_axis(self.column_labels, axis=1) + cached_block = self.cached() + pandas_df, query_job = cached_block.download_pandas_preview(max_results) + shape = cached_block.session._get_table_shape(query_job.destination) + + formatted_df = pandas_df.set_axis(cached_block.column_labels, axis=1) # we reset the axis and substitute the bf index name for the default - formatted_df.index.name = self.index.name - return formatted_df, count, query_job + formatted_df.index.name = cached_block.index.name + # limit column count + formatted_df = formatted_df.iloc[:, 0:max_columns] + return formatted_df, shape, query_job, cached_block def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: result_id = guid.generate_guid() diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index c818b68711..c6f168528f 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -205,17 +205,18 @@ def query_job(self) -> Optional[bigquery.QueryJob]: return self._query_job def __repr__(self) -> str: - # TODO(swast): Add a timeout here? If the query is taking a long time, - # maybe we just print the job metadata that we have so far? - # TODO(swast): Avoid downloading the whole series by using job - # metadata, like we do with DataFrame. opts = bigframes.options.display max_results = opts.max_rows + max_columns = opts.max_columns if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) - pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) + pandas_df, _, query_job, block = self._block.retrieve_repr_request_results( + max_results, max_columns + ) self._query_job = query_job + self._block = block + return repr(pandas_df.index) def copy(self, name: Optional[Hashable] = None): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 07dae2c53b..5a8b75f8ec 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -578,24 +578,9 @@ def __repr__(self) -> str: Only represents the first `bigframes.options.display.max_rows`. """ - opts = bigframes.options.display - max_results = opts.max_rows - if opts.repr_mode == "deferred": - return formatter.repr_query_job(self.query_job) + pandas_df, shape = self._perform_repr_request() - self._cached() - # TODO(swast): pass max_columns and get the true column count back. Maybe - # get 1 more column than we have requested so that pandas can add the - # ... for us? - pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( - max_results - ) - - self._set_internal_query_job(query_job) - - column_count = len(pandas_df.columns) - - with display_options.pandas_repr(opts): + with display_options.pandas_repr(bigframes.options.display): repr_string = repr(pandas_df) # Modify the end of the string to reflect count. @@ -603,42 +588,39 @@ def __repr__(self) -> str: pattern = re.compile("\\[[0-9]+ rows x [0-9]+ columns\\]") if pattern.match(lines[-1]): lines = lines[:-2] - - if row_count > len(lines) - 1: + if shape[0] > len(lines) - 1: lines.append("...") - lines.append("") - lines.append(f"[{row_count} rows x {column_count} columns]") + lines.append(f"[{shape[0]} rows x {shape[1]} columns]") return "\n".join(lines) + def _perform_repr_request(self) -> Tuple[pandas.DataFrame, Tuple[int, int]]: + max_results = bigframes.options.display.max_rows + max_columns = bigframes.options.display.max_columns + pandas_df, shape, query_job, block = self._block.retrieve_repr_request_results( + max_results, max_columns + ) + self._set_block(block) + self._set_internal_query_job(query_job) + return pandas_df, shape + def _repr_html_(self) -> str: """ Returns an html string primarily for use by notebooks for displaying - a representation of the DataFrame. Displays 20 rows by default since - many notebooks are not configured for large tables. + a representation of the DataFrame. Displays at most the number of rows + given by `bigframes.options.display.max_rows`. """ - opts = bigframes.options.display - max_results = bigframes.options.display.max_rows - if opts.repr_mode == "deferred": - return formatter.repr_query_job_html(self.query_job) - self._cached() - # TODO(swast): pass max_columns and get the true column count back. Maybe - # get 1 more column than we have requested so that pandas can add the - # ... for us? - pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( - max_results - ) - - self._set_internal_query_job(query_job) + if bigframes.options.display.repr_mode == "deferred": + return formatter.repr_query_job_html(self.query_job) - column_count = len(pandas_df.columns) + pandas_df, shape = self._perform_repr_request() - with display_options.pandas_repr(opts): + with display_options.pandas_repr(bigframes.options.display): # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy. html_string = pandas_df._repr_html_() # type:ignore - html_string += f"[{row_count} rows x {column_count} columns in total]" + html_string += f"[{shape[0]} rows x {shape[1]} columns in total]" return html_string def __setitem__(self, key: str, value: SingleItemValue): diff --git a/bigframes/series.py b/bigframes/series.py index e7b358c2fe..04cc6b635c 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -272,18 +272,17 @@ def reset_index( return bigframes.dataframe.DataFrame(block) def __repr__(self) -> str: - # TODO(swast): Add a timeout here? If the query is taking a long time, - # maybe we just print the job metadata that we have so far? - # TODO(swast): Avoid downloading the whole series by using job - # metadata, like we do with DataFrame. opts = bigframes.options.display max_results = opts.max_rows + max_columns = opts.max_columns if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) - self._cached() - pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) + pandas_df, _, query_job, block = self._block.retrieve_repr_request_results( + max_results, max_columns + ) self._set_internal_query_job(query_job) + self._set_block(block) return repr(pandas_df.iloc[:, 0]) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 479b3a7bac..8de235e275 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1831,6 +1831,7 @@ def _execute( sorted: bool = True, dry_run=False, col_id_overrides: Mapping[str, str] = {}, + max_results: Optional[int] = None, ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: sql = self._to_sql( array_value, sorted=sorted, col_id_overrides=col_id_overrides @@ -1840,8 +1841,7 @@ def _execute( else: job_config.dry_run = dry_run return self._start_query( - sql=sql, - job_config=job_config, + sql=sql, job_config=job_config, max_results=max_results ) def _peek( @@ -1886,6 +1886,12 @@ def _get_table_size(self, destination_table): table = self.bqclient.get_table(destination_table) return table.num_bytes + def _get_table_shape(self, destination_table): + table = self.bqclient.get_table(destination_table) + row_count = table.num_rows + column_count = len(table.schema) + return (row_count, column_count) + def _rows_to_dataframe( self, row_iterator: bigquery.table.RowIterator, dtypes: Dict ) -> pandas.DataFrame: From b84600c91ec73376917207c8c920ea430210ad5e Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 25 Mar 2024 23:14:35 +0000 Subject: [PATCH 2/8] add a type hint --- bigframes/session/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 8de235e275..6f65ad5bfa 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1886,7 +1886,7 @@ def _get_table_size(self, destination_table): table = self.bqclient.get_table(destination_table) return table.num_bytes - def _get_table_shape(self, destination_table): + def _get_table_shape(self, destination_table) -> Tuple[int, int]: table = self.bqclient.get_table(destination_table) row_count = table.num_rows column_count = len(table.schema) From 2189426f276d66631f61c8745c30b0f4be7dded4 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 25 Mar 2024 23:34:01 +0000 Subject: [PATCH 3/8] fix mypy error --- bigframes/core/blocks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index e4ad02cc9d..b218c85b57 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -473,8 +473,8 @@ def download_pandas_preview( results_iterator, query_job = self.session._execute( self.expr, sorted=True, max_results=max_rows ) - results_iterator = results_iterator.to_arrow_iterable() - arrow_table = next(results_iterator) + arrow_results_iterator = results_iterator.to_arrow_iterable() + arrow_table = next(arrow_results_iterator) downloaded_df = bigframes.session._io.pandas.arrow_to_pandas( arrow_table, dtypes ) From ab3e765c14f9d3ae7e7799804857177da9b3f3dd Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 26 Mar 2024 04:19:29 +0000 Subject: [PATCH 4/8] move cached back out of block --- bigframes/core/blocks.py | 19 ++++++++++--------- bigframes/core/indexes/index.py | 3 +-- bigframes/dataframe.py | 6 ++++-- bigframes/series.py | 4 ++-- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index b218c85b57..69767eebff 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1306,28 +1306,29 @@ def _forward_slice(self, start: int = 0, stop=None, step: int = 1): return block.select_columns(self.value_columns) + # Using cache to optimize for Jupyter Notebook's behavior where both '__repr__' + # and '__repr_html__' are called in a single display action, reducing redundant + # queries. @functools.cache def retrieve_repr_request_results( self, max_results: int, max_columns: int - ) -> Tuple[pd.DataFrame, Tuple[int, int], bigquery.QueryJob, Block]: + ) -> Tuple[pd.DataFrame, Tuple[int, int], bigquery.QueryJob]: """ Retrieves a pandas dataframe containing only max_results many rows for use with printing methods. Returns a tuple of the dataframe preview for printing and the overall number - of rows and columns of the table, as well as the query job used and the cached - block generated. + of rows and columns of the table, as well as the query job used. """ - cached_block = self.cached() - pandas_df, query_job = cached_block.download_pandas_preview(max_results) - shape = cached_block.session._get_table_shape(query_job.destination) + pandas_df, query_job = self.download_pandas_preview(max_results) + shape = self.session._get_table_shape(query_job.destination) - formatted_df = pandas_df.set_axis(cached_block.column_labels, axis=1) + formatted_df = pandas_df.set_axis(self.column_labels, axis=1) # we reset the axis and substitute the bf index name for the default - formatted_df.index.name = cached_block.index.name + formatted_df.index.name = self.index.name # limit column count formatted_df = formatted_df.iloc[:, 0:max_columns] - return formatted_df, shape, query_job, cached_block + return formatted_df, shape, query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: result_id = guid.generate_guid() diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index c6f168528f..48988aaffe 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -211,11 +211,10 @@ def __repr__(self) -> str: if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) - pandas_df, _, query_job, block = self._block.retrieve_repr_request_results( + pandas_df, _, query_job = self._block.retrieve_repr_request_results( max_results, max_columns ) self._query_job = query_job - self._block = block return repr(pandas_df.index) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 5a8b75f8ec..d8ca5d0686 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -580,6 +580,8 @@ def __repr__(self) -> str: """ pandas_df, shape = self._perform_repr_request() + if bigframes.options.display.repr_mode == "deferred": + return formatter.repr_query_job(self.query_job) with display_options.pandas_repr(bigframes.options.display): repr_string = repr(pandas_df) @@ -597,10 +599,10 @@ def __repr__(self) -> str: def _perform_repr_request(self) -> Tuple[pandas.DataFrame, Tuple[int, int]]: max_results = bigframes.options.display.max_rows max_columns = bigframes.options.display.max_columns - pandas_df, shape, query_job, block = self._block.retrieve_repr_request_results( + self._cached() + pandas_df, shape, query_job = self._block.retrieve_repr_request_results( max_results, max_columns ) - self._set_block(block) self._set_internal_query_job(query_job) return pandas_df, shape diff --git a/bigframes/series.py b/bigframes/series.py index 04cc6b635c..f1ac89f514 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -278,11 +278,11 @@ def __repr__(self) -> str: if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) - pandas_df, _, query_job, block = self._block.retrieve_repr_request_results( + self._cached() + pandas_df, _, query_job = self._block.retrieve_repr_request_results( max_results, max_columns ) self._set_internal_query_job(query_job) - self._set_block(block) return repr(pandas_df.iloc[:, 0]) From 58ae8cf2f1a7f642957752177e11f47f19b219ca Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 26 Mar 2024 04:26:30 +0000 Subject: [PATCH 5/8] fix deferred mode --- bigframes/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index d8ca5d0686..4a54661e15 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -578,10 +578,10 @@ def __repr__(self) -> str: Only represents the first `bigframes.options.display.max_rows`. """ - pandas_df, shape = self._perform_repr_request() - if bigframes.options.display.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) + + pandas_df, shape = self._perform_repr_request() with display_options.pandas_repr(bigframes.options.display): repr_string = repr(pandas_df) From 028afadb75ddcf7679eb649194d5dd81848332b7 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 26 Mar 2024 17:13:36 +0000 Subject: [PATCH 6/8] don't count rowindex in shape --- bigframes/session/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 6f65ad5bfa..266a2dce26 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1889,7 +1889,9 @@ def _get_table_size(self, destination_table): def _get_table_shape(self, destination_table) -> Tuple[int, int]: table = self.bqclient.get_table(destination_table) row_count = table.num_rows - column_count = len(table.schema) + column_count = len( + [field for field in table.schema if field.name != "rowindex"] + ) return (row_count, column_count) def _rows_to_dataframe( From 92c2d02a3e13bb52e7d7afc87ad0077835d6060b Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 26 Mar 2024 18:08:00 +0000 Subject: [PATCH 7/8] just use block for column count --- bigframes/core/blocks.py | 5 +++-- bigframes/session/__init__.py | 8 ++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 69767eebff..6827e1afe8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1321,14 +1321,15 @@ def retrieve_repr_request_results( of rows and columns of the table, as well as the query job used. """ pandas_df, query_job = self.download_pandas_preview(max_results) - shape = self.session._get_table_shape(query_job.destination) + row_count = self.session._get_table_row_count(query_job.destination) + column_count = len(self.value_columns) formatted_df = pandas_df.set_axis(self.column_labels, axis=1) # we reset the axis and substitute the bf index name for the default formatted_df.index.name = self.index.name # limit column count formatted_df = formatted_df.iloc[:, 0:max_columns] - return formatted_df, shape, query_job + return formatted_df, (row_count, column_count), query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: result_id = guid.generate_guid() diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 266a2dce26..6a2c87bb05 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1886,13 +1886,9 @@ def _get_table_size(self, destination_table): table = self.bqclient.get_table(destination_table) return table.num_bytes - def _get_table_shape(self, destination_table) -> Tuple[int, int]: + def _get_table_row_count(self, destination_table) -> int: table = self.bqclient.get_table(destination_table) - row_count = table.num_rows - column_count = len( - [field for field in table.schema if field.name != "rowindex"] - ) - return (row_count, column_count) + return table.num_rows def _rows_to_dataframe( self, row_iterator: bigquery.table.RowIterator, dtypes: Dict From 8e08af8db3b4262d2377127cbb2934f3104d534d Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 26 Mar 2024 19:34:52 +0000 Subject: [PATCH 8/8] update comment --- bigframes/dataframe.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4a54661e15..5b36f2deb5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -574,9 +574,10 @@ def __setattr__(self, key: str, value): object.__setattr__(self, key, value) def __repr__(self) -> str: - """Converts a DataFrame to a string. Calls to_pandas. + """Converts a DataFrame to a string using pandas dataframe __repr__. - Only represents the first `bigframes.options.display.max_rows`. + Only represents the first `bigframes.options.display.max_rows` + and `bigframes.options.display.max_columns`. """ if bigframes.options.display.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) @@ -610,7 +611,8 @@ def _repr_html_(self) -> str: """ Returns an html string primarily for use by notebooks for displaying a representation of the DataFrame. Displays at most the number of rows - given by `bigframes.options.display.max_rows`. + and columns given by `bigframes.options.display.max_rows` and + `bigframes.options.display.max_columns`. """ if bigframes.options.display.repr_mode == "deferred":