Skip to content

feat: Support max_columns in repr and make repr more efficient #515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,23 @@ def to_pandas_batches(self):
self._copy_index_to_pandas(df)
yield df

def download_pandas_preview(
self, max_rows: int
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
"""Download one page of results and return the query job."""
dtypes = dict(zip(self.index_columns, self.index.dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
results_iterator, query_job = self.session._execute(
self.expr, sorted=True, max_results=max_rows
)
arrow_results_iterator = results_iterator.to_arrow_iterable()
arrow_table = next(arrow_results_iterator)
downloaded_df = bigframes.session._io.pandas.arrow_to_pandas(
arrow_table, dtypes
)
self._copy_index_to_pandas(downloaded_df)
return downloaded_df, query_job

def _copy_index_to_pandas(self, df: pd.DataFrame):
"""Set the index on pandas DataFrame to match this block.

Expand Down Expand Up @@ -1294,26 +1311,25 @@ def _forward_slice(self, start: int = 0, stop=None, step: int = 1):
# queries.
@functools.cache
def retrieve_repr_request_results(
self, max_results: int
) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]:
self, max_results: int, max_columns: int
) -> Tuple[pd.DataFrame, Tuple[int, int], bigquery.QueryJob]:
"""
Retrieves a pandas dataframe containing only max_results many rows for use
with printing methods.

Returns a tuple of the dataframe and the overall number of rows of the query.
Returns a tuple of the dataframe preview for printing and the overall number
of rows and columns of the table, as well as the query job used.
"""
# TODO(swast): Select a subset of columns if max_columns is less than the
# number of columns in the schema.
count = self.shape[0]
if count > max_results:
head_block = self.slice(0, max_results)
else:
head_block = self
computed_df, query_job = head_block.to_pandas()
formatted_df = computed_df.set_axis(self.column_labels, axis=1)
pandas_df, query_job = self.download_pandas_preview(max_results)
row_count = self.session._get_table_row_count(query_job.destination)
column_count = len(self.value_columns)

formatted_df = pandas_df.set_axis(self.column_labels, axis=1)
# we reset the axis and substitute the bf index name for the default
formatted_df.index.name = self.index.name
return formatted_df, count, query_job
# limit column count
formatted_df = formatted_df.iloc[:, 0:max_columns]
return formatted_df, (row_count, column_count), query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
result_id = guid.generate_guid()
Expand Down
10 changes: 5 additions & 5 deletions bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,17 @@ def query_job(self) -> Optional[bigquery.QueryJob]:
return self._query_job

def __repr__(self) -> str:
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole series by using job
# metadata, like we do with DataFrame.
opts = bigframes.options.display
max_results = opts.max_rows
max_columns = opts.max_columns
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self.query_job)

pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
pandas_df, _, query_job = self._block.retrieve_repr_request_results(
max_results, max_columns
)
self._query_job = query_job

return repr(pandas_df.index)

def copy(self, name: Optional[Hashable] = None):
Expand Down
66 changes: 26 additions & 40 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,71 +574,57 @@ def __setattr__(self, key: str, value):
object.__setattr__(self, key, value)

def __repr__(self) -> str:
"""Converts a DataFrame to a string. Calls to_pandas.
"""Converts a DataFrame to a string using pandas dataframe __repr__.

Only represents the first `bigframes.options.display.max_rows`.
Only represents the first `bigframes.options.display.max_rows`
and `bigframes.options.display.max_columns`.
"""
opts = bigframes.options.display
max_results = opts.max_rows
if opts.repr_mode == "deferred":
if bigframes.options.display.repr_mode == "deferred":
return formatter.repr_query_job(self.query_job)

self._cached()
# TODO(swast): pass max_columns and get the true column count back. Maybe
# get 1 more column than we have requested so that pandas can add the
# ... for us?
pandas_df, row_count, query_job = self._block.retrieve_repr_request_results(
max_results
)

self._set_internal_query_job(query_job)

column_count = len(pandas_df.columns)

with display_options.pandas_repr(opts):
pandas_df, shape = self._perform_repr_request()
with display_options.pandas_repr(bigframes.options.display):
repr_string = repr(pandas_df)

# Modify the end of the string to reflect count.
lines = repr_string.split("\n")
pattern = re.compile("\\[[0-9]+ rows x [0-9]+ columns\\]")
if pattern.match(lines[-1]):
lines = lines[:-2]

if row_count > len(lines) - 1:
if shape[0] > len(lines) - 1:
lines.append("...")

lines.append("")
lines.append(f"[{row_count} rows x {column_count} columns]")
lines.append(f"[{shape[0]} rows x {shape[1]} columns]")
return "\n".join(lines)

def _perform_repr_request(self) -> Tuple[pandas.DataFrame, Tuple[int, int]]:
max_results = bigframes.options.display.max_rows
max_columns = bigframes.options.display.max_columns
self._cached()
pandas_df, shape, query_job = self._block.retrieve_repr_request_results(
max_results, max_columns
)
self._set_internal_query_job(query_job)
return pandas_df, shape

def _repr_html_(self) -> str:
"""
Returns an html string primarily for use by notebooks for displaying
a representation of the DataFrame. Displays 20 rows by default since
many notebooks are not configured for large tables.
a representation of the DataFrame. Displays at most the number of rows
and columns given by `bigframes.options.display.max_rows` and
`bigframes.options.display.max_columns`.
"""
opts = bigframes.options.display
max_results = bigframes.options.display.max_rows
if opts.repr_mode == "deferred":
return formatter.repr_query_job_html(self.query_job)

self._cached()
# TODO(swast): pass max_columns and get the true column count back. Maybe
# get 1 more column than we have requested so that pandas can add the
# ... for us?
pandas_df, row_count, query_job = self._block.retrieve_repr_request_results(
max_results
)

self._set_internal_query_job(query_job)
if bigframes.options.display.repr_mode == "deferred":
return formatter.repr_query_job_html(self.query_job)

column_count = len(pandas_df.columns)
pandas_df, shape = self._perform_repr_request()

with display_options.pandas_repr(opts):
with display_options.pandas_repr(bigframes.options.display):
# _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy.
html_string = pandas_df._repr_html_() # type:ignore

html_string += f"[{row_count} rows x {column_count} columns in total]"
html_string += f"[{shape[0]} rows x {shape[1]} columns in total]"
return html_string

def __setitem__(self, key: str, value: SingleItemValue):
Expand Down
9 changes: 4 additions & 5 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,16 @@ def reset_index(
return bigframes.dataframe.DataFrame(block)

def __repr__(self) -> str:
# TODO(swast): Add a timeout here? If the query is taking a long time,
# maybe we just print the job metadata that we have so far?
# TODO(swast): Avoid downloading the whole series by using job
# metadata, like we do with DataFrame.
opts = bigframes.options.display
max_results = opts.max_rows
max_columns = opts.max_columns
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self.query_job)

self._cached()
pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
pandas_df, _, query_job = self._block.retrieve_repr_request_results(
max_results, max_columns
)
self._set_internal_query_job(query_job)

return repr(pandas_df.iloc[:, 0])
Expand Down
8 changes: 6 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,7 @@ def _execute(
sorted: bool = True,
dry_run=False,
col_id_overrides: Mapping[str, str] = {},
max_results: Optional[int] = None,
) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
sql = self._to_sql(
array_value, sorted=sorted, col_id_overrides=col_id_overrides
Expand All @@ -1840,8 +1841,7 @@ def _execute(
else:
job_config.dry_run = dry_run
return self._start_query(
sql=sql,
job_config=job_config,
sql=sql, job_config=job_config, max_results=max_results
)

def _peek(
Expand Down Expand Up @@ -1886,6 +1886,10 @@ def _get_table_size(self, destination_table):
table = self.bqclient.get_table(destination_table)
return table.num_bytes

def _get_table_row_count(self, destination_table) -> int:
table = self.bqclient.get_table(destination_table)
return table.num_rows

def _rows_to_dataframe(
self, row_iterator: bigquery.table.RowIterator, dtypes: Dict
) -> pandas.DataFrame:
Expand Down