Skip to content

Commit 2ae019d

Browse files
test(recovery): add recovery test for nexmark stream (risingwavelabs#7623)
Signed-off-by: Runji Wang <[email protected]> Co-authored-by: Liang <[email protected]>
1 parent 25499e3 commit 2ae019d

File tree

4 files changed

+101
-1
lines changed

4 files changed

+101
-1
lines changed

Makefile.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ script = """
700700
#!/usr/bin/env bash
701701
set -e
702702
703-
cargo check -p risingwave_simulation "$@"
703+
cargo check -p risingwave_simulation --all-targets "$@"
704704
"""
705705

706706
[tasks.sslt]

src/tests/simulation/src/cluster.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ pub struct Cluster {
109109
}
110110

111111
impl Cluster {
112+
/// Start a RisingWave cluster for testing.
113+
///
114+
/// This function should be called exactly once in a test.
112115
pub async fn start(conf: Configuration) -> Result<Self> {
113116
let handle = madsim::runtime::Handle::current();
114117
println!("seed = {}", handle.seed());
@@ -514,6 +517,7 @@ impl Cluster {
514517
}
515518
}
516519

520+
/// Options for killing nodes.
517521
#[derive(Debug, Clone, Copy, PartialEq)]
518522
pub struct KillOpts {
519523
pub kill_rate: f32,
@@ -522,3 +526,14 @@ pub struct KillOpts {
522526
pub kill_compute: bool,
523527
pub kill_compactor: bool,
524528
}
529+
530+
impl KillOpts {
531+
/// Killing all kind of nodes.
532+
pub const ALL: Self = KillOpts {
533+
kill_rate: 1.0,
534+
kill_meta: true,
535+
kill_frontend: true,
536+
kill_compute: true,
537+
kill_compactor: true,
538+
};
539+
}

src/tests/simulation/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod dynamic_filter;
2222
mod hello;
2323
mod nexmark_chaos;
2424
mod nexmark_q4;
25+
mod nexmark_recovery;
2526
mod nexmark_source;
2627
mod singleton_migration;
2728
mod streaming_parallelism;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#![cfg(madsim)]
16+
17+
use std::time::Duration;
18+
19+
use anyhow::Result;
20+
use madsim::time::{sleep, Instant};
21+
use risingwave_simulation::cluster::{Configuration, KillOpts};
22+
use risingwave_simulation::nexmark::{self, NexmarkCluster, THROUGHPUT};
23+
use risingwave_simulation::utils::AssertResult;
24+
25+
/// Setup a nexmark stream, inject failures, and verify results.
26+
async fn nexmark_recovery_common(create: &str, select: &str, drop: &str) -> Result<()> {
27+
// tracing_subscriber::fmt()
28+
// .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
29+
// .init();
30+
31+
let mut cluster =
32+
NexmarkCluster::new(Configuration::for_scale(), 6, Some(THROUGHPUT * 20), false).await?;
33+
34+
// get the output without failures as the standard result
35+
cluster.run(create).await?;
36+
sleep(Duration::from_secs(30)).await;
37+
let expected = cluster.run(select).await?;
38+
cluster.run(drop).await?;
39+
sleep(Duration::from_secs(5)).await;
40+
41+
cluster.run(create).await?;
42+
43+
// kill nodes and trigger recovery
44+
for _ in 0..5 {
45+
sleep(Duration::from_secs(2)).await;
46+
cluster.kill_node(&KillOpts::ALL).await;
47+
}
48+
// wait enough time to make sure the stream is end
49+
sleep(Duration::from_secs(60)).await;
50+
51+
cluster.run(select).await?.assert_result_eq(&expected);
52+
53+
Ok(())
54+
}
55+
56+
macro_rules! test {
57+
($query:ident) => {
58+
paste::paste! {
59+
#[madsim::test]
60+
async fn [< nexmark_recovery_ $query >]() -> Result<()> {
61+
use risingwave_simulation::nexmark::queries::$query::*;
62+
nexmark_recovery_common(CREATE, SELECT, DROP)
63+
.await
64+
}
65+
}
66+
};
67+
}
68+
69+
// q0, q1, q2: too trivial
70+
test!(q3);
71+
test!(q4);
72+
test!(q5);
73+
// q6: cannot plan
74+
test!(q7);
75+
test!(q8);
76+
test!(q9);
77+
// q10+: duplicated or unsupported
78+
79+
// Self made queries.
80+
test!(q101);
81+
test!(q102);
82+
test!(q103);
83+
test!(q104);
84+
test!(q105);

0 commit comments

Comments
 (0)