Skip to content

Commit 5dd79c1

Browse files
committed
fix tests
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 24c0bd1 commit 5dd79c1

File tree

2 files changed

+55
-11
lines changed

2 files changed

+55
-11
lines changed

src/tests/simulation/src/cluster.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,8 +472,8 @@ impl Cluster {
472472
.await
473473
}
474474

475-
/// Kill some nodes and restart them in 2s + restart_delay_secs with a probability of 0.1.
476-
#[cfg_or_panic(madsim)]
475+
/// Generate a list of random worker nodes to kill by `opts`, then call `kill_nodes` to kill and
476+
/// restart them.
477477
pub async fn kill_node(&self, opts: &KillOpts) {
478478
let mut nodes = vec![];
479479
if opts.kill_meta {
@@ -528,7 +528,20 @@ impl Cluster {
528528
nodes.push(format!("compactor-{}", i));
529529
}
530530
}
531-
join_all(nodes.iter().map(|name| async move {
531+
532+
self.kill_nodes(nodes, opts.restart_delay_secs).await
533+
}
534+
535+
/// Kill the given nodes by their names and restart them in 2s + restart_delay_secs with a
536+
/// probability of 0.1.
537+
#[cfg_or_panic(madsim)]
538+
pub async fn kill_nodes(
539+
&self,
540+
nodes: impl IntoIterator<Item = impl AsRef<str>>,
541+
restart_delay_secs: u32,
542+
) {
543+
join_all(nodes.into_iter().map(|name| async move {
544+
let name = name.as_ref();
532545
let t = rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1));
533546
tokio::time::sleep(t).await;
534547
tracing::info!("kill {name}");
@@ -540,7 +553,7 @@ impl Cluster {
540553
// so that the node is expired and removed from the cluster
541554
if rand::thread_rng().gen_bool(0.1) {
542555
// max_heartbeat_interval_secs = 15
543-
t += Duration::from_secs(opts.restart_delay_secs as u64);
556+
t += Duration::from_secs(restart_delay_secs as u64);
544557
}
545558
tokio::time::sleep(t).await;
546559
tracing::info!("restart {name}");

src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,21 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
use std::time::Duration;
216

317
use anyhow::Result;
4-
use risingwave_simulation::cluster::{Configuration, KillOpts};
18+
use risingwave_simulation::cluster::Configuration;
519
use risingwave_simulation::nexmark::NexmarkCluster;
620
use risingwave_simulation::utils::AssertResult;
721
use tokio::time::{sleep, timeout};
@@ -24,7 +38,16 @@ enum ResumeBy {
2438
}
2539

2640
async fn test_impl(resume_by: ResumeBy) -> Result<()> {
27-
let mut cluster = NexmarkCluster::new(Configuration::for_scale(), 6, None, false).await?;
41+
let mut cluster = NexmarkCluster::new(
42+
Configuration {
43+
meta_nodes: 1,
44+
..Configuration::for_scale()
45+
},
46+
6,
47+
None,
48+
false,
49+
)
50+
.await?;
2851

2952
cluster.run(SET_PARAMETER).await?;
3053
cluster.run(CREATE).await?;
@@ -33,8 +56,8 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> {
3356
// Run for a while.
3457
sleep(Duration::from_secs(10)).await;
3558

36-
// Kill all nodes and wait for the service to recover.
37-
cluster.kill_node(&KillOpts::ALL).await;
59+
// Kill the meta node and wait for the service to recover.
60+
cluster.kill_nodes(["meta-1"], 0).await;
3861
sleep(Duration::from_secs(10)).await;
3962

4063
// The source should be paused.
@@ -64,20 +87,28 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> {
6487

6588
match resume_by {
6689
ResumeBy::Risectl => cluster.resume().await?,
67-
ResumeBy::Restart => cluster.kill_node(&KillOpts::ALL).await,
90+
ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await,
6891
}
6992
sleep(Duration::from_secs(10)).await;
7093

7194
// The source should be resumed.
7295
let new_count = cluster.run(SELECT).await?;
7396
assert_ne!(count, new_count);
7497

75-
// DML on tables should be allowed.
98+
// DML on tables should be allowed. However, we're uncertain whether the previous blocked DML is
99+
// executed or not. So we just check the count difference.
76100
{
77101
let mut session = cluster.start_session();
102+
103+
session.run("FLUSH").await?;
104+
let count: i64 = session.run(SELECT_COUNT_TABLE).await?.parse().unwrap();
105+
78106
session.run(INSERT_INTO_TABLE).await?;
79107
session.run("FLUSH").await?;
80-
session.run(SELECT_COUNT_TABLE).await?.assert_result_eq("1");
108+
session
109+
.run(SELECT_COUNT_TABLE)
110+
.await?
111+
.assert_result_eq(format!("{}", count + 1));
81112
}
82113

83114
Ok(())

0 commit comments

Comments
 (0)