Skip to content

Commit 320d755

Browse files
test(stream): add state cleaning test (risingwavelabs#8546)
Signed-off-by: TennyZhuang <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 8b09f5e commit 320d755

File tree

8 files changed

+366
-6
lines changed

8 files changed

+366
-6
lines changed

Cargo.lock

Lines changed: 31 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ members = [
3535
"src/tests/regress",
3636
"src/tests/simulation",
3737
"src/tests/sqlsmith",
38+
"src/tests/state_cleaning_test",
3839
"src/tracing",
3940
"src/udf",
4041
"src/utils/local_stats_alloc",

src/stream/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ enum Inner {
3636
#[error("Array/Chunk error: {0}")]
3737
Array(ArrayError),
3838

39-
#[error("Executor error: {0}")]
39+
#[error("Executor error: {0:?}")]
4040
Executor(Box<StreamExecutorError>),
4141

4242
#[error(transparent)]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
[package]
2+
name = "risingwave_state_cleaning_test"
3+
version = { workspace = true }
4+
edition = { workspace = true }
5+
homepage = { workspace = true }
6+
keywords = { workspace = true }
7+
license = { workspace = true }
8+
repository = { workspace = true }
9+
10+
[package.metadata.cargo-machete]
11+
ignored = ["workspace-hack"]
12+
13+
[package.metadata.cargo-udeps.ignore]
14+
normal = ["workspace-hack"]
15+
16+
[dependencies]
17+
anyhow = "1"
18+
chrono = "0.4"
19+
clap = { version = "4", features = ["derive"] }
20+
futures = { version = "0.3", default-features = false, features = ["alloc"] }
21+
itertools = "0.10"
22+
regex = "1"
23+
risingwave_rt = { path = "../../utils/runtime" }
24+
serde = { version = "1", features = ["derive"] }
25+
serde_with = "2"
26+
tokio = { version = "0.2", package = "madsim-tokio" }
27+
tokio-postgres = "0.7.7"
28+
tokio-stream = { version = "0.1", features = ["fs"] }
29+
toml = "0.4"
30+
tracing = "0.1"
31+
32+
[target.'cfg(not(madsim))'.dependencies]
33+
workspace-hack = { path = "../../workspace-hack" }
34+
35+
[[bin]]
36+
name = "risingwave_state_cleaning_test"
37+
path = "src/bin/main.rs"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# risingwave_state_cleaning_test
2+
3+
The `risingwave_state_cleaning_test` crate has been designed specifically to test whether RisingWave can effectively clean outdated state records prior to reaching the watermark on time. Its functionality is described using TOML files, which specify the tests that should be executed. By utilizing this crate, developers can ensure that RisingWave is capable of properly managing state records, thereby improving overall application performance and providing a more reliable end-user experience.
4+
5+
## TOML files
6+
7+
The TOML files describe the tests that should be run. Each test is represented as a table in the TOML file with the following format:
8+
9+
```toml
10+
[[test]]
11+
name = "test name" # A human-readable name for the test
12+
init_sqls = [ "SQL statement 1", "SQL statement 2", ... ] # A list of SQL statements to prepare the test environment
13+
bound_tables = [
14+
{ pattern = "table name pattern", limit = number }, # A pattern to match table names and a limit on the number of rows for each table
15+
{ pattern = "table name pattern", limit = number },
16+
...
17+
] # A list of tables that should be checked.
18+
```
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
[[test]]
2+
name = "window_hash_agg"
3+
# Prepare the tesing table & mviews.
4+
init_sqls = [
5+
# Set up the base table.
6+
"""
7+
CREATE TABLE t1 (
8+
created_at timestamp,
9+
grp int,
10+
v int,
11+
WATERMARK FOR created_at AS created_at - interval '9' second
12+
) APPEND ONLY WITH (
13+
connector = 'datagen',
14+
rows_per_second = 100,
15+
datagen.split.num = 16,
16+
fields.created_at.max_past_mode = 'relative',
17+
fields.created_at.max_past = '10s',
18+
fields.grp.min = 0,
19+
fields.grp.max = 5,
20+
);
21+
""",
22+
# Set up the tumble window mview.
23+
"""
24+
CREATE MATERIALIZED VIEW mv_tumble AS
25+
SELECT grp, SUM(v), window_start
26+
FROM tumble(t1, created_at, INTERVAL '1' SECOND)
27+
GROUP BY window_start, grp;
28+
""",
29+
# Set up the hop window mview.
30+
"""
31+
CREATE MATERIALIZED VIEW mv_hop AS
32+
SELECT grp, SUM(v), window_start
33+
FROM hop(t1, created_at, INTERVAL '1' SECOND, INTERVAL '3' SECOND)
34+
GROUP BY window_start, grp;
35+
""",
36+
]
37+
bound_tables = [
38+
# Tumble window agg state table.
39+
{ pattern = '__internal_mv_tumble_\d+_hashaggresult_\d+', limit = 200 },
40+
# Hop window agg state table.
41+
{ pattern = '__internal_mv_hop_\d+_hashaggresult_\d+', limit = 400 },
42+
]

0 commit comments

Comments
 (0)