Skip to content

test(stream): add join and temporal_filter state-cleaning test #8596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/tests/state_cleaning_test/data/join.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
[[test]]
name = "window_hash_join"
init_sqls = [
"""
CREATE TABLE orders (
order_id INTEGER,
user_id INTEGER,
amount INTEGER,
created_at TIMESTAMP,
WATERMARK FOR created_at AS created_at - interval '9' second
) APPEND ONLY WITH (
connector = 'datagen',
rows_per_second = 100,
datagen.split.num = 16,
fields.created_at.max_past_mode = 'relative',
fields.created_at.max_past = '10s',
fields.order_id.kind = 'sequence',
fields.order_id.start = 0,
fields.user_id.min = 0,
fields.user_id.max = 20,
fields.amount.min = 0,
fields.amount.max = 20,
);
""",
"""
CREATE TABLE clicks (
click_id INTEGER,
user_id INTEGER,
created_at TIMESTAMP,
WATERMARK FOR created_at AS created_at - interval '9' second
) APPEND ONLY WITH (
connector = 'datagen',
rows_per_second = 200,
datagen.split.num = 16,
fields.created_at.max_past_mode = 'relative',
fields.created_at.max_past = '10s',
fields.click_id.kind = 'sequence',
fields.click_id.start = 0,
fields.user_id.min = 0,
fields.user_id.max = 20,
);
""",
"""
CREATE MATERIALIZED VIEW mv_tumble_join AS
SELECT clicks.window_start, clicks.user_id AS user_id
FROM
TUMBLE(orders, created_at, INTERVAL '1' second) AS orders
JOIN TUMBLE(clicks, created_at, INTERVAL '1' second) AS clicks
ON
orders.window_start = clicks.window_start AND
clicks.user_id = orders.user_id;
""",
]
bound_tables = { pattern = '__internal_mv_tumble_join_\d+_hashjoin(left|right)_\d+', limit = 300 }
45 changes: 45 additions & 0 deletions src/tests/state_cleaning_test/data/temporal_filter.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[[test]]
name = "temporal_filter"
init_sqls = [
"""
CREATE TABLE clicks (
click_id INTEGER,
user_id INTEGER,
created_at TIMESTAMP,
WATERMARK FOR created_at AS created_at - interval '9' second
) APPEND ONLY WITH (
connector = 'datagen',
rows_per_second = 200,
datagen.split.num = 16,
fields.created_at.max_past_mode = 'relative',
fields.created_at.max_past = '10s',
fields.click_id.kind = 'sequence',
fields.click_id.start = 0,
fields.user_id.min = 0,
fields.user_id.max = 20,
);
""",
# Used by now()
"""
SET TIME ZONE LOCAL;
""",
"""
CREATE MATERIALIZED VIEW clicks_10s AS
SELECT * FROM clicks WHERE created_at > now() - INTERVAL '10' second;
""",
"""
CREATE MATERIALIZED VIEW clicks_20s AS
SELECT * FROM clicks WHERE created_at > now() - INTERVAL '20' second;
""",
"""
CREATE MATERIALIZED VIEW clicks_30s AS
SELECT * FROM clicks WHERE created_at > now() - INTERVAL '30' second;
""",
]
bound_tables = [
{ pattern = '__internal_clicks_10s_\d+_dynamicfilterleft_\d+', limit = 300 },
{ pattern = '__internal_clicks_20s_\d+_dynamicfilterleft_\d+', limit = 600 },
{ pattern = '__internal_clicks_30s_\d+_dynamicfilterleft_\d+', limit = 900 },
# Right table should always only contains 1 record.
{ pattern = '__internal_clicks_\d+s_\d+_dynamicfilterright_\d+', limit = 1 },
]