Skip to content

Commit 2db01f9

Browse files
chenzl25BugenZhaost1page
authored
feat(streaming): support temporal join part 3 (risingwavelabs#8480)
Co-authored-by: Bugen Zhao <[email protected]> Co-authored-by: st1page <[email protected]>
1 parent f36bf0b commit 2db01f9

File tree

27 files changed

+456
-21
lines changed

27 files changed

+456
-21
lines changed

e2e_test/streaming/temporal_join.slt

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
statement ok
2+
SET RW_IMPLICIT_FLUSH TO true;
3+
4+
statement ok
5+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
6+
7+
statement ok
8+
create table version(id2 int, a2 int, b2 int, primary key (id2));
9+
10+
statement ok
11+
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2
12+
13+
statement ok
14+
insert into stream values(1, 11, 111);
15+
16+
statement ok
17+
insert into version values(1, 11, 111);
18+
19+
statement ok
20+
insert into stream values(1, 11, 111);
21+
22+
statement ok
23+
delete from version;
24+
25+
query IIII rowsort
26+
select * from v;
27+
----
28+
1 11 1 11
29+
1 11 NULL NULL
30+
31+
statement ok
32+
insert into version values(2, 22, 222);
33+
34+
statement ok
35+
insert into stream values(2, 22, 222);
36+
37+
query IIII rowsort
38+
select * from v;
39+
----
40+
1 11 1 11
41+
1 11 NULL NULL
42+
2 22 2 22
43+
44+
statement ok
45+
drop materialized view v;
46+
47+
statement ok
48+
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2
49+
50+
query IIII rowsort
51+
select * from v;
52+
----
53+
1 11 NULL NULL
54+
1 11 NULL NULL
55+
2 22 2 22
56+
57+
statement ok
58+
drop materialized view v;
59+
60+
statement ok
61+
drop table stream;
62+
63+
statement ok
64+
drop table version;
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
2+
- name: Left join type for temporal join
3+
sql: |
4+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
5+
create table version(id2 int, a2 int, b2 int, primary key (id2));
6+
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1= id2
7+
stream_plan: |
8+
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
9+
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
10+
├─StreamExchange { dist: HashShard(stream.id1) }
11+
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
12+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
13+
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
14+
batch_error: |-
15+
Not supported: do not support temporal join for batch queries
16+
HINT: please use temporal join in streaming queries
17+
- name: Inner join type for temporal join
18+
sql: |
19+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
20+
create table version(id2 int, a2 int, b2 int, primary key (id2));
21+
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
22+
stream_plan: |
23+
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
24+
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
25+
├─StreamExchange { dist: HashShard(stream.id1) }
26+
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
27+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
28+
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
29+
- name: implicit join with temporal tables
30+
sql: |
31+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
32+
create table version(id2 int, a2 int, b2 int, primary key (id2));
33+
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF NOW() where id1 = id2 AND a2 < 10;
34+
stream_plan: |
35+
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
36+
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
37+
├─StreamExchange { dist: HashShard(stream.id1) }
38+
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
39+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
40+
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
41+
- name: Multi join key for temporal join
42+
sql: |
43+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
44+
create table version(id2 int, a2 int, b2 int, primary key (id2, a2));
45+
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on a1 = a2 and id1 = id2 where b2 != a2;
46+
stream_plan: |
47+
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, a2, id1, a1], pk_conflict: "no check" }
48+
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
49+
├─StreamExchange { dist: HashShard(stream.id1, stream.a1) }
50+
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
51+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2, version.a2) }
52+
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2], pk: [version.id2, version.a2], dist: UpstreamHashShard(version.id2, version.a2) }
53+
- name: Temporal join with Aggregation
54+
sql: |
55+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
56+
create table version(id2 int, a2 int, b2 int, primary key (id2));
57+
select count(*) from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
58+
stream_plan: |
59+
StreamMaterialize { columns: [count], pk_columns: [], pk_conflict: "no check" }
60+
└─StreamProject { exprs: [sum0(count)] }
61+
└─StreamAppendOnlyGlobalSimpleAgg { aggs: [sum0(count), count] }
62+
└─StreamExchange { dist: Single }
63+
└─StreamStatelessLocalSimpleAgg { aggs: [count] }
64+
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] }
65+
├─StreamExchange { dist: HashShard(stream.id1) }
66+
| └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
67+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
68+
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
69+
- name: Temporal join join keys requirement test
70+
sql: |
71+
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
72+
create table version(id2 int, a2 int, b2 int, primary key (id2, a2));
73+
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
74+
stream_error: |-
75+
Not supported: Temporal join requires the lookup table's primary key contained exactly in the equivalence condition
76+
HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions
77+
- name: Temporal join append only test
78+
sql: |
79+
create table stream(id1 int, a1 int, b1 int);
80+
create table version(id2 int, a2 int, b2 int, primary key (id2));
81+
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
82+
stream_error: |-
83+
Not supported: Temporal join requires a append-only left input
84+
HINT: Please ensure your left input is append-only
85+
- name: Temporal join type test
86+
sql: |
87+
create table stream(id1 int, a1 int, b1 int);
88+
create table version(id2 int, a2 int, b2 int, primary key (id2));
89+
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
90+
stream_error: |-
91+
Not supported: exist dangling temporal scan
92+
HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used.
93+
- name: multi-way temporal join with the same key
94+
sql: |
95+
create table stream(k int, a1 int, b1 int) APPEND ONLY;
96+
create table version1(k int, x1 int, y2 int, primary key (k));
97+
create table version2(k int, x2 int, y2 int, primary key (k));
98+
select stream.k, x1, x2, a1, b1
99+
from stream
100+
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.k = version1.k
101+
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.k = version2.k where a1 < 10;
102+
stream_plan: |
103+
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], pk_columns: [stream._row_id, version1.k, k, version2.k], pk_conflict: "no check" }
104+
└─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] }
105+
├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
106+
| ├─StreamExchange { dist: HashShard(stream.k) }
107+
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
108+
| | └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
109+
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.k) }
110+
| └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) }
111+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.k) }
112+
└─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) }
113+
- name: multi-way temporal join with different keys
114+
sql: |
115+
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
116+
create table version1(id1 int, x1 int, y2 int, primary key (id1));
117+
create table version2(id2 int, x2 int, y2 int, primary key (id2));
118+
select stream.id1, x1, stream.id2, x2, a1, b1
119+
from stream
120+
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1
121+
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10;
122+
stream_plan: |
123+
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" }
124+
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] }
125+
├─StreamExchange { dist: HashShard(stream.id2) }
126+
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
127+
| ├─StreamExchange { dist: HashShard(stream.id1) }
128+
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
129+
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
130+
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) }
131+
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
132+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) }
133+
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }
134+
- name: multi-way temporal join with different keys
135+
sql: |
136+
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
137+
create table version1(id1 int, x1 int, y2 int, primary key (id1));
138+
create table version2(id2 int, x2 int, y2 int, primary key (id2));
139+
select stream.id1, x1, stream.id2, x2, a1, b1
140+
from stream
141+
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1
142+
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10;
143+
stream_plan: |
144+
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" }
145+
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] }
146+
├─StreamExchange { dist: HashShard(stream.id2) }
147+
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
148+
| ├─StreamExchange { dist: HashShard(stream.id1) }
149+
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
150+
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
151+
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) }
152+
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
153+
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) }
154+
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }

src/frontend/src/binder/relation/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ impl Binder {
258258
&mut self,
259259
name: ObjectName,
260260
alias: Option<TableAlias>,
261+
for_system_time_as_of_now: bool,
261262
) -> Result<Relation> {
262263
let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;
263264
if schema_name.is_none() && let Some(item) = self.context.cte_to_relation.get(&table_name) {
@@ -293,7 +294,7 @@ impl Binder {
293294
Ok(share_relation)
294295
} else {
295296

296-
self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias)
297+
self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, for_system_time_as_of_now)
297298
}
298299
}
299300

@@ -313,7 +314,7 @@ impl Binder {
313314
}?;
314315

315316
Ok((
316-
self.bind_relation_by_name(table_name.clone(), None)?,
317+
self.bind_relation_by_name(table_name.clone(), None, false)?,
317318
table_name,
318319
))
319320
}
@@ -358,12 +359,16 @@ impl Binder {
358359
.map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string());
359360

360361
let table_name = self.catalog.get_table_name_by_id(table_id)?;
361-
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias)
362+
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, false)
362363
}
363364

364365
pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
365366
match table_factor {
366-
TableFactor::Table { name, alias } => self.bind_relation_by_name(name, alias),
367+
TableFactor::Table {
368+
name,
369+
alias,
370+
for_system_time_as_of_now,
371+
} => self.bind_relation_by_name(name, alias, for_system_time_as_of_now),
367372
TableFactor::TableFunction { name, alias, args } => {
368373
let func_name = &name.0[0].real_value();
369374
if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) {
@@ -378,6 +383,7 @@ impl Binder {
378383
Some(PG_CATALOG_SCHEMA_NAME),
379384
PG_KEYWORDS_TABLE_NAME,
380385
alias,
386+
false,
381387
)
382388
} else if let Ok(table_function_type) = TableFunctionType::from_str(func_name) {
383389
let args: Vec<ExprImpl> = args

src/frontend/src/binder/relation/table_or_source.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct BoundBaseTable {
3737
pub table_id: TableId,
3838
pub table_catalog: TableCatalog,
3939
pub table_indexes: Vec<Arc<IndexCatalog>>,
40+
pub for_system_time_as_of_now: bool,
4041
}
4142

4243
#[derive(Debug, Clone)]
@@ -63,6 +64,7 @@ impl Binder {
6364
schema_name: Option<&str>,
6465
table_name: &str,
6566
alias: Option<TableAlias>,
67+
for_system_time_as_of_now: bool,
6668
) -> Result<Relation> {
6769
fn is_system_schema(schema_name: &str) -> bool {
6870
SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name)
@@ -126,7 +128,11 @@ impl Binder {
126128
self.catalog
127129
.get_table_by_name(&self.db_name, schema_path, table_name)
128130
{
129-
self.resolve_table_relation(table_catalog, schema_name)?
131+
self.resolve_table_relation(
132+
table_catalog,
133+
schema_name,
134+
for_system_time_as_of_now,
135+
)?
130136
} else if let Ok((source_catalog, _)) =
131137
self.catalog
132138
.get_source_by_name(&self.db_name, schema_path, table_name)
@@ -167,7 +173,11 @@ impl Binder {
167173
self.catalog.get_schema_by_name(&self.db_name, schema_name)
168174
{
169175
if let Some(table_catalog) = schema.get_table_by_name(table_name) {
170-
return self.resolve_table_relation(table_catalog, schema_name);
176+
return self.resolve_table_relation(
177+
table_catalog,
178+
schema_name,
179+
for_system_time_as_of_now,
180+
);
171181
} else if let Some(source_catalog) =
172182
schema.get_source_by_name(table_name)
173183
{
@@ -194,6 +204,7 @@ impl Binder {
194204
&self,
195205
table_catalog: &TableCatalog,
196206
schema_name: &str,
207+
for_system_time_as_of_now: bool,
197208
) -> Result<(Relation, Vec<(bool, Field)>)> {
198209
let table_id = table_catalog.id();
199210
let table_catalog = table_catalog.clone();
@@ -208,6 +219,7 @@ impl Binder {
208219
table_id,
209220
table_catalog,
210221
table_indexes,
222+
for_system_time_as_of_now,
211223
};
212224

213225
Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
@@ -291,6 +303,7 @@ impl Binder {
291303
table_id,
292304
table_catalog,
293305
table_indexes,
306+
for_system_time_as_of_now: false,
294307
})
295308
}
296309

src/frontend/src/binder/select.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ impl Binder {
309309
Some(PG_CATALOG_SCHEMA_NAME),
310310
PG_USER_TABLE_NAME,
311311
None,
312+
false,
312313
)?);
313314
let where_clause = Some(
314315
FunctionCall::new(

src/frontend/src/binder/update.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl Binder {
7373
let owner = table_catalog.owner;
7474
let table_version_id = table_catalog.version_id().expect("table must be versioned");
7575

76-
let table = self.bind_relation_by_name(name, None)?;
76+
let table = self.bind_relation_by_name(name, None, false)?;
7777

7878
let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?;
7979

src/frontend/src/handler/create_index.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ fn assemble_materialize(
286286
// Index table has no indexes.
287287
vec![],
288288
context,
289+
false,
289290
);
290291

291292
let exprs = index_columns

0 commit comments

Comments
 (0)