Skip to content

perf: add ability to cache dataframe and series to session table #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,29 @@ def slice(
)
return sliced_expr if step > 0 else sliced_expr.reversed()

def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue:
"""Write the ArrayValue to a session table and create a new block object that references it."""
ibis_expr = self._to_ibis_expr(
ordering_mode="unordered", expose_hidden_cols=True
)
destination = self._session._ibis_to_session_table(
ibis_expr, cluster_cols=cluster_cols, api_name="cache"
)
table_expression = self._session.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{destination.table_id}`"
)
new_columns = [table_expression[column] for column in self.column_names]
new_hidden_columns = [
table_expression[column] for column in self._hidden_ordering_column_names
]
return ArrayValue(
self._session,
table_expression,
columns=new_columns,
hidden_ordering_columns=new_hidden_columns,
ordering=self._ordering,
)


class ArrayValueBuilder:
"""Mutable expression class.
Expand Down
9 changes: 9 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,15 @@ def to_sql_query(
idx_labels,
)

def cached(self) -> Block:
"""Write the block to a session table and create a new block object that references it."""
return Block(
self.expr.cached(cluster_cols=self.index_columns),
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self.index_labels,
)

def _is_monotonic(
self, column_ids: typing.Union[str, Sequence[str]], increasing: bool
) -> bool:
Expand Down
3 changes: 3 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2480,3 +2480,6 @@ def _set_block(self, block: blocks.Block):

def _get_block(self) -> blocks.Block:
return self._block

def _cached(self) -> DataFrame:
return DataFrame(self._block.cached())
3 changes: 3 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,9 @@ def _slice(
),
)

def _cached(self) -> Series:
return Series(self._block.cached())


def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]:
return pandas.api.types.is_list_like(obj)
10 changes: 10 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2717,3 +2717,13 @@ def test_query_job_setters(scalars_df_default_index: dataframe.DataFrame):
job_ids.add(scalars_df_default_index.query_job.job_id)

assert len(job_ids) == 2


def test_df_cached(scalars_df_index):
df = scalars_df_index.set_index(["int64_too", "int64_col"]).sort_values(
"string_col"
)
df = df[df["rowindex_2"] % 2 == 0]

df_cached_copy = df._cached()
pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas())