Skip to content

perf: if primary keys are defined, read_gbq avoids copying table data #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigframes/core/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pandas as pd

import bigframes.constants as constants
import bigframes.core.blocks
import bigframes.core.guid as guid
import bigframes.core.indexes as indexes
import bigframes.core.scalar
Expand Down Expand Up @@ -214,7 +215,7 @@ def __getitem__(self, key: tuple) -> bigframes.core.scalar.Scalar:
raise ValueError(error_message)
if len(key) != 2:
raise TypeError(error_message)
block = self._dataframe._block
block: bigframes.core.blocks.Block = self._dataframe._block
column_block = block.select_columns([block.value_columns[key[1]]])
column = bigframes.series.Series(column_block)
return column.iloc[key[0]]
Expand Down
110 changes: 95 additions & 15 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ def read_gbq_query(

See also: :meth:`Session.read_gbq`.
"""
# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
return self._read_gbq_query(
query=query,
index_col=index_col,
Expand All @@ -515,8 +517,6 @@ def _read_gbq_query(
max_results: Optional[int] = None,
api_name: str,
) -> dataframe.DataFrame:
# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
if isinstance(index_col, str):
index_cols = [index_col]
else:
Expand Down Expand Up @@ -561,6 +561,8 @@ def read_gbq_table(

See also: :meth:`Session.read_gbq`.
"""
# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
return self._read_gbq_table(
query=query,
index_col=index_col,
Expand All @@ -569,6 +571,62 @@ def read_gbq_table(
api_name="read_gbq_table",
)

def _read_gbq_table_to_ibis_with_total_ordering(
self,
table_ref: bigquery.table.TableReference,
*,
api_name: str,
) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]:
"""Create a read-only Ibis table expression representing a table.

If we can get a total ordering from the table, such as via primary key
column(s), then return those too so that ordering generation can be
avoided.
"""
if table_ref.dataset_id.upper() == "_SESSION":
# _SESSION tables aren't supported by the tables.get REST API.
return (
self.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`"
),
None,
)

table_expression = self.ibis_client.table(
table_ref.table_id,
database=f"{table_ref.project}.{table_ref.dataset_id}",
)

# If there are primary keys defined, the query engine assumes these
# columns are unique, even if the constraint is not enforced. We make
# the same assumption and use these columns as the total ordering keys.
table = self.bqclient.get_table(table_ref)

# TODO(b/305264153): Use public properties to fetch primary keys once
# added to google-cloud-bigquery.
primary_keys = (
table._properties.get("tableConstraints", {})
.get("primaryKey", {})
.get("columns")
)

if not primary_keys:
return table_expression, None
else:
# Read from a snapshot since we won't have to copy the table data to create a total ordering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

job_config = bigquery.QueryJobConfig()
job_config.labels["bigframes-api"] = api_name
current_timestamp = list(
self.bqclient.query(
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
job_config=job_config,
).result()
)[0][0]
table_expression = self.ibis_client.sql(
bigframes_io.create_snapshot_sql(table_ref, current_timestamp)
)
return table_expression, primary_keys

def _read_gbq_table(
self,
query: str,
Expand All @@ -581,24 +639,19 @@ def _read_gbq_table(
if max_results and max_results <= 0:
raise ValueError("`max_results` should be a positive number.")

# NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so
# these docstrings are inline.
# TODO(swast): Can we re-use the temp table from other reads in the
# session, if the original table wasn't modified?
table_ref = bigquery.table.TableReference.from_string(
query, default_project=self.bqclient.project
)

if table_ref.dataset_id.upper() == "_SESSION":
# _SESSION tables aren't supported by the tables.get REST API.
table_expression = self.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`"
)
else:
table_expression = self.ibis_client.table(
table_ref.table_id,
database=f"{table_ref.project}.{table_ref.dataset_id}",
)
(
table_expression,
total_ordering_cols,
) = self._read_gbq_table_to_ibis_with_total_ordering(
table_ref,
api_name=api_name,
)

for key in col_order:
if key not in table_expression.columns:
Expand All @@ -624,7 +677,34 @@ def _read_gbq_table(
ordering = None
is_total_ordering = False

if len(index_cols) != 0:
if total_ordering_cols is not None:
# Note: currently, this a table has a total ordering only when the
# primary key(s) are set on a table. The query engine assumes such
# columns are unique, even if not enforced.
is_total_ordering = True
ordering = core.ExpressionOrdering(
ordering_value_columns=[
core.OrderingColumnReference(column_id)
for column_id in total_ordering_cols
],
total_ordering_columns=frozenset(total_ordering_cols),
)

if len(index_cols) != 0:
index_labels = typing.cast(List[Optional[str]], index_cols)
else:
# Use the total_ordering_cols to project offsets to use as the default index.
table_expression = table_expression.order_by(index_cols)
default_index_id = guid.generate_guid("bigframes_index_")
default_index_col = (
ibis.row_number().cast(ibis_dtypes.int64).name(default_index_id)
)
table_expression = table_expression.mutate(
**{default_index_id: default_index_col}
)
index_cols = [default_index_id]
index_labels = [None]
elif len(index_cols) != 0:
index_labels = typing.cast(List[Optional[str]], index_cols)
distinct_table = table_expression.select(*index_cols).distinct()
is_unique_sql = f"""WITH full_table AS (
Expand Down
1 change: 0 additions & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
"system",
"doctest",
"cover",
"release_dry_run",
]

# Error if a python version is missing
Expand Down
31 changes: 31 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import math
import pathlib
import textwrap
import typing
from typing import Dict, Optional

Expand Down Expand Up @@ -795,6 +796,36 @@ def penguins_randomforest_classifier_model_name(
return model_name


@pytest.fixture(scope="session")
def usa_names_grouped_table(
session: bigframes.Session, dataset_id_permanent
) -> bigquery.Table:
"""Provides a table with primary key(s) set."""
table_id = f"{dataset_id_permanent}.usa_names_grouped"
try:
return session.bqclient.get_table(table_id)
except google.cloud.exceptions.NotFound:
query = textwrap.dedent(
f"""
CREATE TABLE `{dataset_id_permanent}.usa_names_grouped`
(
total_people INT64,
name STRING,
gender STRING,
year INT64,
PRIMARY KEY(name, gender, year) NOT ENFORCED
)
AS
SELECT SUM(`number`) AS total_people, name, gender, year
FROM `bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY name, gender, year
"""
)
job = session.bqclient.query(query)
job.result()
return session.bqclient.get_table(table_id)


@pytest.fixture()
def deferred_repr():
bigframes.options.display.repr_mode = "deferred"
Expand Down
25 changes: 25 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import List

import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import numpy as np
import pandas as pd
import pytest
Expand Down Expand Up @@ -231,6 +232,30 @@ def test_read_gbq_w_anonymous_query_results_table(session: bigframes.Session):
pd.testing.assert_frame_equal(result, expected, check_dtype=False)


def test_read_gbq_w_primary_keys_table(
session: bigframes.Session, usa_names_grouped_table: bigquery.Table
):
table = usa_names_grouped_table
# TODO(b/305264153): Use public properties to fetch primary keys once
# added to google-cloud-bigquery.
primary_keys = (
table._properties.get("tableConstraints", {})
.get("primaryKey", {})
.get("columns")
)
assert len(primary_keys) != 0

df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}")
result = df.head(100).to_pandas()

# Verify that the DataFrame is already sorted by primary keys.
sorted_result = result.sort_values(primary_keys)
pd.testing.assert_frame_equal(result, sorted_result)

# Verify that we're working from a snapshot rather than a copy of the table.
assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM



@pytest.mark.parametrize(
("query_or_table", "max_results"),
[
Expand Down