Skip to content

perf: reimplement unpivot to use cross join rather than union #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,61 +982,78 @@ def unpivot(
ArrayValue: The unpivoted ArrayValue
"""
table = self._to_ibis_expr(ordering_mode="offset_col")
sub_expressions = []

# Use ibis memtable to infer type of rowlabels (if possible)
# TODO: Allow caller to specify dtype
labels_ibis_type = ibis.memtable({"col": row_labels})["col"].type()
labels_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(labels_ibis_type)

row_n = len(row_labels)
if not all(
len(source_columns) == row_n for _, source_columns in unpivot_columns
):
raise ValueError("Columns and row labels must all be same length.")

for i in range(row_n):
values = []
for j in range(len(unpivot_columns)):
result_col, source_cols = unpivot_columns[j]
col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype
if source_cols[i] is not None:
values.append(
ops.AsTypeOp(col_dtype)
._as_ibis(table[source_cols[i]])
.name(result_col)
)
else:
values.append(
bigframes.dtypes.literal_to_ibis_scalar(
None, force_dtype=col_dtype
).name(result_col)
)
offsets_value = (
((table[ORDER_ID_COLUMN] * row_n) + i)
.cast(ibis_dtypes.int64)
.name(ORDER_ID_COLUMN),
unpivot_offset_id = bigframes.core.guid.generate_guid("unpivot_offsets_")
unpivot_table = table.cross_join(
ibis.memtable({unpivot_offset_id: range(row_n)})
)
unpivot_offsets_value = (
(
(unpivot_table[ORDER_ID_COLUMN] * row_n)
+ unpivot_table[unpivot_offset_id]
)
sub_expr = table.select(
passthrough_columns,
.cast(ibis_dtypes.int64)
.name(ORDER_ID_COLUMN),
)

# Use ibis memtable to infer type of rowlabels (if possible)
# TODO: Allow caller to specify dtype
labels_ibis_type = ibis.memtable({"col": row_labels})["col"].type()
labels_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(labels_ibis_type)
cases = [
(
i,
bigframes.dtypes.literal_to_ibis_scalar(
row_labels[i], force_dtype=labels_dtype # type:ignore
).name(index_col_id),
*values,
offsets_value,
),
)
for i in range(len(row_labels))
]
labels_value = (
typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id])
.cases(cases, default=None) # type:ignore
.name(index_col_id)
)

unpivot_values = []
for j in range(len(unpivot_columns)):
col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype
result_col, source_cols = unpivot_columns[j]
null_value = bigframes.dtypes.literal_to_ibis_scalar(
None, force_dtype=col_dtype
)
ibis_values = [
ops.AsTypeOp(col_dtype)._as_ibis(unpivot_table[col])
if col is not None
else null_value
for col in source_cols
]
cases = [(i, ibis_values[i]) for i in range(len(ibis_values))]
unpivot_value = typing.cast(
ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]
).cases(
cases, default=null_value # type:ignore
)
sub_expressions.append(sub_expr)
rotated_table = ibis.union(*sub_expressions)
unpivot_values.append(unpivot_value.name(result_col))

unpivot_table = unpivot_table.select(
passthrough_columns, labels_value, *unpivot_values, unpivot_offsets_value
)

value_columns = [
rotated_table[value_col_id] for value_col_id, _ in unpivot_columns
unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns
]
passthrough_values = [rotated_table[col] for col in passthrough_columns]
passthrough_values = [unpivot_table[col] for col in passthrough_columns]
return ArrayValue(
session=self._session,
table=rotated_table,
columns=[rotated_table[index_col_id], *value_columns, *passthrough_values],
hidden_ordering_columns=[rotated_table[ORDER_ID_COLUMN]],
table=unpivot_table,
columns=[unpivot_table[index_col_id], *value_columns, *passthrough_values],
hidden_ordering_columns=[unpivot_table[ORDER_ID_COLUMN]],
ordering=ExpressionOrdering(
ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)],
integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True),
Expand Down