Skip to content

Commit 499f24a

Browse files
authored
perf: improve series.unique performance and replace drop_duplicates i… (#1108)
* perf: improve series.unique performance and replace drop_duplicates in tpch test. * change variable name * update docstring and index. * update flag to control behavior. * update examples * delete empty line * keep order error * reduce query_count * improve q16 * benchmark updates
1 parent 254875c commit 499f24a

File tree

10 files changed

+84
-45
lines changed

10 files changed

+84
-45
lines changed

bigframes/series.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,9 +1609,16 @@ def drop_duplicates(self, *, keep: str = "first") -> Series:
16091609
block = block_ops.drop_duplicates(self._block, (self._value_column,), keep)
16101610
return Series(block)
16111611

1612-
@validations.requires_ordering()
1613-
def unique(self) -> Series:
1614-
return self.drop_duplicates()
1612+
def unique(self, keep_order=True) -> Series:
1613+
if keep_order:
1614+
validations.enforce_ordered(self, "unique(keep_order != False)")
1615+
return self.drop_duplicates()
1616+
block, result = self._block.aggregate(
1617+
[self._value_column],
1618+
[(self._value_column, agg_ops.AnyValueOp())],
1619+
dropna=False,
1620+
)
1621+
return Series(block.select_columns(result).reset_index())
16151622

16161623
def duplicated(self, keep: str = "first") -> Series:
16171624
if keep is not False:

third_party/bigframes_vendored/pandas/core/series.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,13 +645,18 @@ def nunique(self) -> int:
645645
"""
646646
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)
647647

648-
def unique(self) -> Series:
648+
def unique(self, keep_order=True) -> Series:
649649
"""
650650
Return unique values of Series object.
651651
652-
Uniques are returned in order of appearance. Hash table-based unique,
652+
By default, uniques are returned in order of appearance. Hash table-based unique,
653653
therefore does NOT sort.
654654
655+
Args:
656+
keep_order (bool, default True):
657+
If True, preserves the order of the first appearance of each unique value.
658+
If False, returns the elements in ascending order, which can be faster.
659+
655660
**Examples:**
656661
657662
>>> import bigframes.pandas as bpd
@@ -664,12 +669,21 @@ def unique(self) -> Series:
664669
2 3
665670
3 3
666671
Name: A, dtype: Int64
672+
673+
Example with order preservation: Slower, but keeps order
667674
>>> s.unique()
668675
0 2
669676
1 1
670677
2 3
671678
Name: A, dtype: Int64
672679
680+
Example without order preservation: Faster, but loses original order
681+
>>> s.unique(keep_order=False)
682+
0 1
683+
1 2
684+
2 3
685+
Name: A, dtype: Int64
686+
673687
Returns:
674688
Series: The unique values returned as a Series.
675689
"""

third_party/bigframes_vendored/tpch/queries/q10.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
2828
var1 = date(1993, 10, 1)
2929
var2 = date(1994, 1, 1)
3030

31-
q_final = customer.merge
32-
3331
q_final = (
3432
customer.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY")
3533
.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY")
@@ -61,15 +59,21 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
6159
as_index=False,
6260
).agg(REVENUE=bpd.NamedAgg(column="INTERMEDIATE_REVENUE", aggfunc="sum"))
6361

64-
q_final[
65-
[
66-
"C_CUSTKEY",
67-
"C_NAME",
68-
"REVENUE",
69-
"C_ACCTBAL",
70-
"N_NAME",
71-
"C_ADDRESS",
72-
"C_PHONE",
73-
"C_COMMENT",
62+
q_final = (
63+
q_final[
64+
[
65+
"C_CUSTKEY",
66+
"C_NAME",
67+
"REVENUE",
68+
"C_ACCTBAL",
69+
"N_NAME",
70+
"C_ADDRESS",
71+
"C_PHONE",
72+
"C_COMMENT",
73+
]
7474
]
75-
].sort_values(by="REVENUE", ascending=False).head(20).to_gbq()
75+
.sort_values(by="REVENUE", ascending=False)
76+
.head(20)
77+
)
78+
79+
q_final.to_gbq()

third_party/bigframes_vendored/tpch/queries/q11.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,16 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
3030

3131
grouped["VALUE"] = grouped["VALUE"].round(2)
3232

33-
total_value = (filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).sum()
34-
threshold = total_value * 0.0001
33+
total_value = (
34+
(filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).to_frame().sum()
35+
)
36+
threshold = (total_value * 0.0001).rename("THRESHOLD")
37+
38+
grouped = grouped.merge(threshold, how="cross")
3539

36-
result_df = grouped[grouped["VALUE"] > threshold]
40+
result_df = grouped[grouped["VALUE"] > grouped["THRESHOLD"]].drop(
41+
columns="THRESHOLD"
42+
)
3743

3844
result_df = result_df.sort_values(by="VALUE", ascending=False)
3945

third_party/bigframes_vendored/tpch/queries/q15.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
3636
supplier, grouped_revenue, left_on="S_SUPPKEY", right_on="SUPPLIER_NO"
3737
)
3838

39-
max_revenue = joined_data["TOTAL_REVENUE"].max()
40-
max_revenue_suppliers = joined_data[joined_data["TOTAL_REVENUE"] == max_revenue]
39+
max_revenue = joined_data[["TOTAL_REVENUE"]].max().rename("MAX_REVENUE")
40+
41+
joined_data = joined_data.merge(max_revenue, how="cross")
42+
43+
max_revenue_suppliers = joined_data[
44+
joined_data["TOTAL_REVENUE"] == joined_data["MAX_REVENUE"]
45+
]
4146

4247
max_revenue_suppliers["TOTAL_REVENUE"] = max_revenue_suppliers[
4348
"TOTAL_REVENUE"

third_party/bigframes_vendored/tpch/queries/q16.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,22 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
2020

2121
var1 = "Brand#45"
2222

23-
supplier = supplier[
24-
supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True)
25-
]["S_SUPPKEY"]
23+
supplier = (
24+
supplier[
25+
~supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True)
26+
]["S_SUPPKEY"]
27+
.unique(keep_order=False)
28+
.to_frame()
29+
)
2630

2731
q_filtered = part.merge(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY")
2832
q_filtered = q_filtered[q_filtered["P_BRAND"] != var1]
2933
q_filtered = q_filtered[~q_filtered["P_TYPE"].str.contains("MEDIUM POLISHED")]
3034
q_filtered = q_filtered[q_filtered["P_SIZE"].isin([49, 14, 23, 45, 19, 3, 36, 9])]
3135

32-
final_df = q_filtered[~q_filtered["PS_SUPPKEY"].isin(supplier)]
36+
final_df = q_filtered.merge(
37+
supplier, left_on=["PS_SUPPKEY"], right_on=["S_SUPPKEY"]
38+
)
3339

3440
grouped = final_df.groupby(["P_BRAND", "P_TYPE", "P_SIZE"], as_index=False)
3541
result = grouped.agg(

third_party/bigframes_vendored/tpch/queries/q17.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
3333

3434
q_final = q_final[q_final["L_QUANTITY"] < q_final["AVG_QUANTITY"]]
3535

36-
q_final = bpd.DataFrame(
37-
{"AVG_YEARLY": [(q_final["L_EXTENDEDPRICE"].sum() / 7.0).round(2)]}
36+
q_final = (
37+
(q_final[["L_EXTENDEDPRICE"]].sum() / 7.0).round(2).to_frame(name="AVG_YEARLY")
3838
)
3939

4040
q_final.to_gbq()

third_party/bigframes_vendored/tpch/queries/q20.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
4646

4747
if not session._strictly_ordered:
4848
filtered_parts = filtered_parts[["P_PARTKEY"]].sort_values(by=["P_PARTKEY"])
49-
filtered_parts = filtered_parts[["P_PARTKEY"]].drop_duplicates()
49+
filtered_parts = filtered_parts["P_PARTKEY"].unique(keep_order=False).to_frame()
5050
joined_parts = filtered_parts.merge(
5151
partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY"
5252
)
@@ -56,10 +56,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
5656
)
5757
final_filtered = final_join[final_join["PS_AVAILQTY"] > final_join["SUM_QUANTITY"]]
5858

59-
final_filtered = final_filtered[["PS_SUPPKEY"]]
60-
if not session._strictly_ordered:
61-
final_filtered = final_filtered.sort_values(by="PS_SUPPKEY")
62-
final_filtered = final_filtered.drop_duplicates()
59+
final_filtered = final_filtered["PS_SUPPKEY"].unique(keep_order=False).to_frame()
6360

6461
final_result = final_filtered.merge(q3, left_on="PS_SUPPKEY", right_on="S_SUPPKEY")
6562
final_result = final_result[["S_NAME", "S_ADDRESS"]].sort_values(by="S_NAME")

third_party/bigframes_vendored/tpch/queries/q22.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
1818

1919
customer["CNTRYCODE"] = customer["C_PHONE"].str.slice(0, 2)
2020

21-
avg_acctbal = customer[
22-
(customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0)
23-
]["C_ACCTBAL"].mean()
21+
avg_acctbal = (
22+
customer[
23+
(customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0)
24+
][["C_ACCTBAL"]]
25+
.mean()
26+
.rename("AVG_ACCTBAL")
27+
)
2428

25-
if not session._strictly_ordered:
26-
orders = orders.sort_values(by="O_CUSTKEY")
27-
orders_unique = orders.drop_duplicates(subset=["O_CUSTKEY"])
29+
orders_unique = orders["O_CUSTKEY"].unique(keep_order=False).to_frame()
2830

2931
matched_customers = customer.merge(
3032
orders_unique, left_on="C_CUSTKEY", right_on="O_CUSTKEY"
@@ -35,10 +37,11 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
3537
matched_customers[["C_CUSTKEY", "IS_IN_ORDERS"]], on="C_CUSTKEY", how="left"
3638
)
3739
customer["IS_IN_ORDERS"] = customer["IS_IN_ORDERS"].fillna(False)
40+
customer = customer.merge(avg_acctbal, how="cross")
3841

3942
filtered_customers = customer[
4043
(customer["CNTRYCODE"].isin(country_codes))
41-
& (customer["C_ACCTBAL"] > avg_acctbal)
44+
& (customer["C_ACCTBAL"] > customer["AVG_ACCTBAL"])
4245
& (~customer["IS_IN_ORDERS"])
4346
]
4447

third_party/bigframes_vendored/tpch/queries/q4.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session):
2626
jn = jn[(jn["O_ORDERDATE"] >= var1) & (jn["O_ORDERDATE"] < var2)]
2727
jn = jn[jn["L_COMMITDATE"] < jn["L_RECEIPTDATE"]]
2828

29-
if not session._strictly_ordered:
30-
jn = jn.sort_values(by=["O_ORDERPRIORITY", "L_ORDERKEY"])
31-
32-
jn = jn.drop_duplicates(subset=["O_ORDERPRIORITY", "L_ORDERKEY"])
29+
jn = jn.groupby(["O_ORDERPRIORITY", "L_ORDERKEY"], as_index=False).agg("size")
3330

3431
gb = jn.groupby("O_ORDERPRIORITY", as_index=False)
3532
agg = gb.agg(ORDER_COUNT=bpd.NamedAgg(column="L_ORDERKEY", aggfunc="count"))

0 commit comments

Comments
 (0)