Skip to content

feat: add df.unstack #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 79 additions & 34 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -963,10 +963,11 @@ def unpivot(
],
*,
passthrough_columns: typing.Sequence[str] = (),
index_col_id: str = "index",
index_col_ids: typing.Sequence[str] = ["index"],
dtype: typing.Union[
bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype]
] = pandas.Float64Dtype(),
how="left",
) -> ArrayValue:
"""
Unpivot ArrayValue columns.
Expand All @@ -981,8 +982,11 @@ def unpivot(
Returns:
ArrayValue: The unpivoted ArrayValue
"""
table = self._to_ibis_expr(ordering_mode="offset_col")
if how not in ("left", "right"):
raise ValueError("'how' must be 'left' or 'right'")
table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True)
row_n = len(row_labels)
hidden_col_ids = self._hidden_ordering_column_names.keys()
if not all(
len(source_columns) == row_n for _, source_columns in unpivot_columns
):
Expand All @@ -992,33 +996,44 @@ def unpivot(
unpivot_table = table.cross_join(
ibis.memtable({unpivot_offset_id: range(row_n)})
)
unpivot_offsets_value = (
(
(unpivot_table[ORDER_ID_COLUMN] * row_n)
+ unpivot_table[unpivot_offset_id]
)
.cast(ibis_dtypes.int64)
.name(ORDER_ID_COLUMN),
)

# Use ibis memtable to infer type of rowlabels (if possible)
# TODO: Allow caller to specify dtype
labels_ibis_type = ibis.memtable({"col": row_labels})["col"].type()
labels_dtype = bigframes.dtypes.ibis_dtype_to_bigframes_dtype(labels_ibis_type)
cases = [
(
i,
bigframes.dtypes.literal_to_ibis_scalar(
row_labels[i], force_dtype=labels_dtype # type:ignore
),
)
for i in range(len(row_labels))
if isinstance(row_labels[0], tuple):
labels_table = ibis.memtable(row_labels)
labels_ibis_types = [
labels_table[col].type() for col in labels_table.columns
]
else:
labels_ibis_types = [ibis.memtable({"col": row_labels})["col"].type()]
labels_dtypes = [
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type)
for ibis_type in labels_ibis_types
]
labels_value = (
typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id])
.cases(cases, default=None) # type:ignore
.name(index_col_id)
)

label_columns = []
for label_part, (col_id, label_dtype) in enumerate(
zip(index_col_ids, labels_dtypes)
):
# interpret as tuples even if it wasn't originally so can apply same logic for multi-column labels
labels_as_tuples = [
label if isinstance(label, tuple) else (label,) for label in row_labels
]
cases = [
(
i,
bigframes.dtypes.literal_to_ibis_scalar(
label_tuple[label_part], # type:ignore
force_dtype=label_dtype, # type:ignore
),
)
for i, label_tuple in enumerate(labels_as_tuples)
]
labels_value = (
typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id])
.cases(cases, default=None) # type:ignore
.name(col_id)
)
label_columns.append(labels_value)

unpivot_values = []
for j in range(len(unpivot_columns)):
Expand All @@ -1042,23 +1057,53 @@ def unpivot(
unpivot_values.append(unpivot_value.name(result_col))

unpivot_table = unpivot_table.select(
passthrough_columns, labels_value, *unpivot_values, unpivot_offsets_value
passthrough_columns,
*label_columns,
*unpivot_values,
*hidden_col_ids,
unpivot_offset_id,
)

# Extend the original ordering using unpivot_offset_id
old_ordering = self._ordering
if how == "left":
new_ordering = ExpressionOrdering(
ordering_value_columns=[
*old_ordering.ordering_value_columns,
OrderingColumnReference(unpivot_offset_id),
],
total_ordering_columns=frozenset(
[*old_ordering.total_ordering_columns, unpivot_offset_id]
),
)
else: # how=="right"
new_ordering = ExpressionOrdering(
ordering_value_columns=[
OrderingColumnReference(unpivot_offset_id),
*old_ordering.ordering_value_columns,
],
total_ordering_columns=frozenset(
[*old_ordering.total_ordering_columns, unpivot_offset_id]
),
)
value_columns = [
unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns
]
passthrough_values = [unpivot_table[col] for col in passthrough_columns]
hidden_ordering_columns = [
unpivot_table[unpivot_offset_id],
*[unpivot_table[hidden_col] for hidden_col in hidden_col_ids],
]
return ArrayValue(
session=self._session,
table=unpivot_table,
columns=[unpivot_table[index_col_id], *value_columns, *passthrough_values],
hidden_ordering_columns=[unpivot_table[ORDER_ID_COLUMN]],
ordering=ExpressionOrdering(
ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)],
integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True),
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
),
columns=[
*[unpivot_table[col_id] for col_id in index_col_ids],
*value_columns,
*passthrough_values,
],
hidden_ordering_columns=hidden_ordering_columns,
ordering=new_ordering,
)

def assign(self, source_id: str, destination_id: str) -> ArrayValue:
Expand Down
143 changes: 55 additions & 88 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ def aggregate_all_and_stack(
]
result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot(
row_labels=self.column_labels.to_list(),
index_col_id="index",
index_col_ids=["index"],
unpivot_columns=[(value_col_id, self.value_columns)],
dtype=dtype,
)
Expand All @@ -849,7 +849,7 @@ def aggregate_all_and_stack(
expr_with_offsets, offset_col = self.expr.promote_offsets()
stacked_expr = expr_with_offsets.unpivot(
row_labels=self.column_labels.to_list(),
index_col_id=guid.generate_guid(),
index_col_ids=[guid.generate_guid()],
unpivot_columns=[(value_col_id, self.value_columns)],
passthrough_columns=[*self.index_columns, offset_col],
dtype=dtype,
Expand Down Expand Up @@ -1041,7 +1041,7 @@ def summarize(
expr = self.expr.aggregate(aggregations).unpivot(
labels,
unpivot_columns=columns,
index_col_id=label_col_id,
index_col_ids=[label_col_id],
)
labels = self._get_labels_for_columns(column_ids)
return Block(expr, column_labels=labels, index_columns=[label_col_id])
Expand Down Expand Up @@ -1225,116 +1225,83 @@ def pivot(

return result_block.with_column_labels(column_index)

def stack(self):
def stack(self, how="left", dropna=True, sort=True, levels: int = 1):
"""Unpivot last column axis level into row axis"""
if isinstance(self.column_labels, pd.MultiIndex):
return self._stack_multi()
else:
return self._stack_mono()

def _stack_mono(self):
if isinstance(self.column_labels, pd.MultiIndex):
raise ValueError("Expected single level index")

# These are the values that will be turned into rows
stack_values = self.column_labels.drop_duplicates().sort_values()

# Get matching columns
unpivot_columns: List[Tuple[str, List[str]]] = []
dtypes: List[bigframes.dtypes.Dtype] = []
col_id = guid.generate_guid("unpivot_")
dtype = None
input_columns: Sequence[Optional[str]] = []
for uvalue in stack_values:
matching_ids = self.label_to_col_id.get(uvalue, [])
input_id = matching_ids[0] if len(matching_ids) > 0 else None
if input_id:
if dtype and dtype != self._column_type(input_id):
raise NotImplementedError(
"Cannot stack columns with non-matching dtypes."
)
else:
dtype = self._column_type(input_id)
input_columns.append(input_id)
unpivot_columns.append((col_id, input_columns))
if dtype:
dtypes.append(dtype or pd.Float64Dtype())
col_labels, row_labels = utils.split_index(self.column_labels, levels=levels)
if dropna:
row_labels = row_labels.drop_duplicates()
if sort:
row_labels = row_labels.sort_values()

added_index_column = col_id = guid.generate_guid()
unpivot_expr = self._expr.unpivot(
row_labels=stack_values,
passthrough_columns=self.index_columns,
unpivot_columns=unpivot_columns,
index_col_id=added_index_column,
dtype=dtypes,
)
block = Block(
unpivot_expr,
index_columns=[*self.index_columns, added_index_column],
column_labels=[None],
index_labels=[*self._index_labels, self.column_labels.names[-1]],
)
return block

def _stack_multi(self):
if not isinstance(self.column_labels, pd.MultiIndex):
raise ValueError("Expected multi-index")

# These are the values that will be turned into rows
stack_values = (
self.column_labels.get_level_values(-1).drop_duplicates().sort_values()
)
row_label_tuples = utils.index_as_tuples(row_labels)

result_col_labels = (
self.column_labels.droplevel(-1)
.drop_duplicates()
.sort_values()
.dropna(how="all")
)
if col_labels is not None:
result_index = col_labels.drop_duplicates().sort_values().dropna(how="all")
result_col_labels = utils.index_as_tuples(result_index)
else:
result_index = pd.Index([None])
result_col_labels = list([()])

# Get matching columns
unpivot_columns: List[Tuple[str, List[str]]] = []
dtypes = []
for val in result_col_labels:
col_id = guid.generate_guid("unpivot_")
dtype = None
input_columns: Sequence[Optional[str]] = []
for uvalue in stack_values:
# Need to unpack if still a multi-index after dropping 1 level
label_to_match = (
(val, uvalue) if result_col_labels.nlevels == 1 else (*val, uvalue)
)
matching_ids = self.label_to_col_id.get(label_to_match, [])
input_id = matching_ids[0] if len(matching_ids) > 0 else None
if input_id:
if dtype and dtype != self._column_type(input_id):
raise NotImplementedError(
"Cannot stack columns with non-matching dtypes."
)
else:
dtype = self._column_type(input_id)
input_columns.append(input_id)
# Input column i is the first one that
input_columns, dtype = self._create_stack_column(val, row_label_tuples)
unpivot_columns.append((col_id, input_columns))
if dtype:
dtypes.append(dtype or pd.Float64Dtype())

added_index_column = col_id = guid.generate_guid()
added_index_columns = [guid.generate_guid() for _ in range(row_labels.nlevels)]
unpivot_expr = self._expr.unpivot(
row_labels=stack_values,
row_labels=row_label_tuples,
passthrough_columns=self.index_columns,
unpivot_columns=unpivot_columns,
index_col_id=added_index_column,
index_col_ids=added_index_columns,
dtype=dtypes,
how=how,
)
new_index_level_names = self.column_labels.names[-levels:]
if how == "left":
index_columns = [*self.index_columns, *added_index_columns]
index_labels = [*self._index_labels, *new_index_level_names]
else:
index_columns = [*added_index_columns, *self.index_columns]
index_labels = [*new_index_level_names, *self._index_labels]

block = Block(
unpivot_expr,
index_columns=[*self.index_columns, added_index_column],
column_labels=result_col_labels,
index_labels=[*self._index_labels, self.column_labels.names[-1]],
index_columns=index_columns,
column_labels=result_index,
index_labels=index_labels,
)
return block

def _create_stack_column(
self, col_label: typing.Tuple, stack_labels: typing.Sequence[typing.Tuple]
):
dtype = None
input_columns: list[Optional[str]] = []
for uvalue in stack_labels:
label_to_match = (*col_label, *uvalue)
label_to_match = (
label_to_match[0] if len(label_to_match) == 1 else label_to_match
)
matching_ids = self.label_to_col_id.get(label_to_match, [])
input_id = matching_ids[0] if len(matching_ids) > 0 else None
if input_id:
if dtype and dtype != self._column_type(input_id):
raise NotImplementedError(
"Cannot stack columns with non-matching dtypes."
)
else:
dtype = self._column_type(input_id)
input_columns.append(input_id)
# Input column i is the first one that
return input_columns, dtype or pd.Float64Dtype()

def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype:
col_offset = self.value_columns.index(col_id)
dtype = self.dtypes[col_offset]
Expand Down
20 changes: 20 additions & 0 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ def combine_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex:
return multi_index


def index_as_tuples(index: pd.Index) -> typing.Sequence[typing.Tuple]:
if isinstance(index, pd.MultiIndex):
return [label for label in index]
else:
return [(label,) for label in index]


def split_index(
index: pd.Index, levels: int = 1
) -> typing.Tuple[typing.Optional[pd.Index], pd.Index]:
nlevels = index.nlevels
remaining = nlevels - levels
if remaining > 0:
return index.droplevel(list(range(remaining, nlevels))), index.droplevel(
list(range(0, remaining))
)
else:
return (None, index)


def get_standardized_ids(
col_labels: Iterable[Hashable], idx_labels: Iterable[Hashable] = ()
) -> tuple[list[str], list[str]]:
Expand Down
Loading