Skip to content

Commit 8eb9d33

Browse files
zblzcpcloud
andauthored
feat(postgres): implement support for asof_join API via a lateral join (#11024)
Co-authored-by: Phillip Cloud <[email protected]>
1 parent 9b25ab1 commit 8eb9d33

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

ibis/backends/sql/compilers/postgres.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,37 @@ def visit_Hash(self, op, *, arg):
739739
f"{self.dialect} backend"
740740
)
741741

742+
def visit_JoinLink(self, op, *, how, table, predicates):
743+
if how == "asof":
744+
# Convert asof join to a lateral left join
745+
746+
# The asof match condition is always the first predicate
747+
match_condition, *predicates = predicates
748+
on = sg.and_(*predicates) if predicates else None
749+
750+
return sge.Join(
751+
this=sge.Lateral(
752+
this=sge.Subquery(
753+
this=sg.select(sge.Star())
754+
.from_(table)
755+
.where(match_condition)
756+
# the ordering for the subquery depends on whether we
757+
# want to pick the one row with the largest or smallest
758+
# value that fulfills the match condition
759+
.order_by(
760+
match_condition.expression.asc()
761+
if match_condition.key in {"lte", "lt"}
762+
else match_condition.expression.desc()
763+
)
764+
.limit(1)
765+
)
766+
).as_(table.alias_or_name),
767+
kind="left",
768+
on=on,
769+
)
770+
771+
return super().visit_JoinLink(op, how=how, table=table, predicates=predicates)
772+
742773
def visit_TableUnnest(
743774
self,
744775
op,

ibis/backends/tests/test_asof_join.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ def time_keyed_right(time_keyed_df2):
8686
[
8787
"datafusion",
8888
"trino",
89-
"postgres",
9089
"mysql",
9190
"pyspark",
9291
"druid",
@@ -96,7 +95,6 @@ def time_keyed_right(time_keyed_df2):
9695
"oracle",
9796
"mssql",
9897
"sqlite",
99-
"risingwave",
10098
"flink",
10199
"databricks",
102100
"athena",
@@ -138,9 +136,7 @@ def test_asof_join(con, time_left, time_right, time_df1, time_df2, direction, op
138136
"mssql",
139137
"mysql",
140138
"oracle",
141-
"postgres",
142139
"pyspark",
143-
"risingwave",
144140
"sqlite",
145141
"trino",
146142
"athena",

0 commit comments

Comments
 (0)