Skip to content

feat: add multi-column dataframe merge #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import bigframes.core as core
import bigframes.core.guid as guid
import bigframes.core.indexes as indexes
import bigframes.core.joins as joins
import bigframes.core.ordering as ordering
import bigframes.core.utils
import bigframes.core.utils as utils
Expand Down Expand Up @@ -1403,6 +1404,78 @@ def concat(
result_block = result_block.reset_index()
return result_block

def merge(
self,
other: Block,
how: typing.Literal[
"inner",
"left",
"outer",
"right",
],
left_col_ids: typing.Sequence[str],
right_col_ids: typing.Sequence[str],
sort: bool,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> Block:
(
joined_expr,
coalesced_join_cols,
(get_column_left, get_column_right),
) = joins.join_by_column(
self.expr,
left_col_ids,
other.expr,
right_col_ids,
how=how,
sort=sort,
)

# which join key parts should be coalesced
merge_join_key_mask = [
str(self.col_id_to_label[left_id]) == str(other.col_id_to_label[right_id])
for left_id, right_id in zip(left_col_ids, right_col_ids)
]
labels_to_coalesce = [
self.col_id_to_label[col_id]
for i, col_id in enumerate(left_col_ids)
if merge_join_key_mask[i]
]

def left_col_mapping(col_id: str) -> str:
if col_id in left_col_ids:
join_key_part = left_col_ids.index(col_id)
if merge_join_key_mask[join_key_part]:
return coalesced_join_cols[join_key_part]
return get_column_left(col_id)

def right_col_mapping(col_id: str) -> typing.Optional[str]:
if col_id in right_col_ids:
join_key_part = right_col_ids.index(col_id)
if merge_join_key_mask[join_key_part]:
return None
return get_column_right(col_id)

left_columns = [left_col_mapping(col_id) for col_id in self.value_columns]

right_columns = [
typing.cast(str, right_col_mapping(col_id))
for col_id in other.value_columns
if right_col_mapping(col_id)
]

expr = joined_expr.select_columns([*left_columns, *right_columns])
labels = utils.merge_column_labels(
self.column_labels,
other.column_labels,
coalesce_labels=labels_to_coalesce,
suffixes=suffixes,
)

# Constructs default index
expr, offset_index_id = expr.promote_offsets()
return Block(expr, index_columns=[offset_index_id], column_labels=labels)

def _force_reproject(self) -> Block:
"""Forces a reprojection of the underlying tables expression. Used to force predicate/order application before subsequent operations."""
return Block(
Expand Down
60 changes: 20 additions & 40 deletions bigframes/core/joins/single_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def join_by_column(
"right",
],
sort: bool = False,
coalesce_join_keys: bool = True,
allow_row_identity_join: bool = True,
) -> Tuple[
core.ArrayValue,
Expand All @@ -59,8 +58,6 @@ def join_by_column(
right: Expression for right table to join.
right_column_ids: Column IDs (not label) to join by.
how: The type of join to perform.
coalesce_join_keys: if set to False, returned column ids will contain
both left and right join key columns.
allow_row_identity_join (bool):
If True, allow matching by row identity. Set to False to always
perform a true JOIN in generated SQL.
Expand All @@ -71,8 +68,6 @@ def join_by_column(
* Sequence[str]: Column IDs of the coalesced join columns. Sometimes either the
left/right table will have missing rows. This column pulls the
non-NULL value from either left/right.
If coalesce_join_keys is False, will return uncombined left and
right key columns.
* Tuple[Callable, Callable]: For a given column ID from left or right,
respectively, return the new column id from the combined expression.
"""
Expand Down Expand Up @@ -100,9 +95,7 @@ def join_by_column(
right_join_keys = [
combined_expr.get_column(get_column_right(col)) for col in right_column_ids
]
join_key_cols = get_join_cols(
left_join_keys, right_join_keys, how, coalesce_join_keys
)
join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how)
join_key_ids = [col.get_name() for col in join_key_cols]
combined_expr = combined_expr.projection(
[*join_key_cols, *combined_expr.columns]
Expand Down Expand Up @@ -182,9 +175,7 @@ def get_column_right(col_id):
right_join_keys = [
combined_table[get_column_right(col)] for col in right_column_ids
]
join_key_cols = get_join_cols(
left_join_keys, right_join_keys, how, coalesce_join_keys
)
join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how)
# We could filter out the original join columns, but predicates/ordering
# might still reference them in implicit joins.
columns = (
Expand Down Expand Up @@ -226,46 +217,35 @@ def get_column_right(col_id):
)


def get_join_cols(
def get_coalesced_join_cols(
left_join_cols: typing.Iterable[ibis_types.Value],
right_join_cols: typing.Iterable[ibis_types.Value],
how: str,
coalesce_join_keys: bool = True,
) -> typing.List[ibis_types.Value]:
join_key_cols: list[ibis_types.Value] = []
for left_col, right_col in zip(left_join_cols, right_join_cols):
if not coalesce_join_keys:
if how == "left" or how == "inner":
join_key_cols.append(left_col.name(guid.generate_guid(prefix="index_")))
elif how == "right":
join_key_cols.append(right_col.name(guid.generate_guid(prefix="index_")))
else:
if how == "left" or how == "inner":
elif how == "outer":
# The left index and the right index might contain null values, for
# example due to an outer join with different numbers of rows. Coalesce
# these to take the index value from either column.
# Use a random name in case the left index and the right index have the
# same name. In such a case, _x and _y suffixes will already be used.
# Don't need to coalesce if they are exactly the same column.
if left_col.name("index").equals(right_col.name("index")):
join_key_cols.append(left_col.name(guid.generate_guid(prefix="index_")))
elif how == "right":
join_key_cols.append(
right_col.name(guid.generate_guid(prefix="index_"))
)
elif how == "outer":
# The left index and the right index might contain null values, for
# example due to an outer join with different numbers of rows. Coalesce
# these to take the index value from either column.
# Use a random name in case the left index and the right index have the
# same name. In such a case, _x and _y suffixes will already be used.
# Don't need to coalesce if they are exactly the same column.
if left_col.name("index").equals(right_col.name("index")):
join_key_cols.append(
left_col.name(guid.generate_guid(prefix="index_"))
)
else:
join_key_cols.append(
ibis.coalesce(
left_col,
right_col,
).name(guid.generate_guid(prefix="index_"))
)
else:
raise ValueError(
f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}"
join_key_cols.append(
ibis.coalesce(
left_col,
right_col,
).name(guid.generate_guid(prefix="index_"))
)
else:
raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}")
return join_key_cols


Expand Down
33 changes: 33 additions & 0 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,36 @@ def get_standardized_ids(
idx_ids, col_ids = ids[: len(idx_ids)], ids[len(idx_ids) :]

return col_ids, idx_ids


def merge_column_labels(
left_labels: pd.Index,
right_labels: pd.Index,
coalesce_labels: typing.Sequence,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> pd.Index:
result_labels = []

for col_label in left_labels:
if col_label in right_labels:
if col_label in coalesce_labels:
# Merging on the same column only returns 1 key column from coalesce both.
# Take the left key column.
result_labels.append(col_label)
else:
result_labels.append(str(col_label) + suffixes[0])
else:
result_labels.append(col_label)

for col_label in right_labels:
if col_label in left_labels:
if col_label in coalesce_labels:
# Merging on the same column only returns 1 key column from coalesce both.
# Pass the right key column.
pass
else:
result_labels.append(str(col_label) + suffixes[1])
else:
result_labels.append(col_label)

return pd.Index(result_labels)
127 changes: 34 additions & 93 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import bigframes.core.indexers as indexers
import bigframes.core.indexes as indexes
import bigframes.core.io
import bigframes.core.joins as joins
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.window
Expand Down Expand Up @@ -1779,12 +1778,10 @@ def merge(
] = "inner",
# TODO(garrettwu): Currently can take inner, outer, left and right. To support
# cross joins
# TODO(garrettwu): Support "on" list of columns and None. Currently a single
# column must be provided
on: Optional[str] = None,
on: Union[blocks.Label, Sequence[blocks.Label], None] = None,
*,
left_on: Optional[str] = None,
right_on: Optional[str] = None,
left_on: Union[blocks.Label, Sequence[blocks.Label], None] = None,
right_on: Union[blocks.Label, Sequence[blocks.Label], None] = None,
sort: bool = False,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> DataFrame:
Expand All @@ -1798,97 +1795,41 @@ def merge(
)
left_on, right_on = on, on

left = self
left_on_sql = self._sql_names(left_on)
# 0 elements already throws an exception
if len(left_on_sql) > 1:
raise ValueError(f"The column label {left_on} is not unique.")
left_on_sql = left_on_sql[0]

right_on_sql = right._sql_names(right_on)
if len(right_on_sql) > 1:
raise ValueError(f"The column label {right_on} is not unique.")
right_on_sql = right_on_sql[0]

(
joined_expr,
join_key_ids,
(get_column_left, get_column_right),
) = joins.join_by_column(
left._block.expr,
[left_on_sql],
right._block.expr,
[right_on_sql],
how=how,
sort=sort,
# In merging on the same column, it only returns 1 key column from coalesced both.
# While if 2 different columns, both will be presented in the result.
coalesce_join_keys=(left_on == right_on),
)
# TODO(swast): Add suffixes to the column labels instead of reusing the
# column IDs as the new labels.
# Drop the index column(s) to be consistent with pandas.
left_columns = [
join_key_ids[0] if (col_id == left_on_sql) else get_column_left(col_id)
for col_id in left._block.value_columns
]

right_columns = []
for col_id in right._block.value_columns:
if col_id == right_on_sql:
# When left_on == right_on
if len(join_key_ids) > 1:
right_columns.append(join_key_ids[1])
else:
right_columns.append(get_column_right(col_id))

expr = joined_expr.select_columns([*left_columns, *right_columns])
labels = self._get_merged_col_labels(
right, left_on=left_on, right_on=right_on, suffixes=suffixes
)
if utils.is_list_like(left_on):
left_on = list(left_on) # type: ignore
else:
left_on = [left_on]

# Constructs default index
expr, offset_index_id = expr.promote_offsets()
block = blocks.Block(
expr, index_columns=[offset_index_id], column_labels=labels
if utils.is_list_like(right_on):
right_on = list(right_on) # type: ignore
else:
right_on = [right_on]

left_join_ids = []
for label in left_on: # type: ignore
left_col_id = self._resolve_label_exact(label)
# 0 elements already throws an exception
if not left_col_id:
raise ValueError(f"No column {label} found in self.")
left_join_ids.append(left_col_id)

right_join_ids = []
for label in right_on: # type: ignore
right_col_id = right._resolve_label_exact(label)
if not right_col_id:
raise ValueError(f"No column {label} found in other.")
right_join_ids.append(right_col_id)

block = self._block.merge(
right._block,
how,
left_join_ids,
right_join_ids,
sort=sort,
suffixes=suffixes,
)
return DataFrame(block)

def _get_merged_col_labels(
self,
right: DataFrame,
left_on: str,
right_on: str,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> List[blocks.Label]:
on_col_equal = left_on == right_on

left_col_labels: list[blocks.Label] = []
for col_label in self._block.column_labels:
if col_label in right._block.column_labels:
if on_col_equal and col_label == left_on:
# Merging on the same column only returns 1 key column from coalesce both.
# Take the left key column.
left_col_labels.append(col_label)
else:
left_col_labels.append(str(col_label) + suffixes[0])
else:
left_col_labels.append(col_label)

right_col_labels: list[blocks.Label] = []
for col_label in right._block.column_labels:
if col_label in self._block.column_labels:
if on_col_equal and col_label == left_on:
# Merging on the same column only returns 1 key column from coalesce both.
# Pass the right key column.
pass
else:
right_col_labels.append(str(col_label) + suffixes[1])
else:
right_col_labels.append(col_label)

return left_col_labels + right_col_labels

def join(
self, other: DataFrame, *, on: Optional[str] = None, how: str = "left"
) -> DataFrame:
Expand Down
Loading