Skip to content

Commit 8648ff8

Browse files
authored
refactor(simulation): interface of running multiple queries in the same session (risingwavelabs#8697)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 6313c42 commit 8648ff8

File tree

6 files changed

+175
-144
lines changed

6 files changed

+175
-144
lines changed

src/tests/simulation/src/cluster.rs

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ use std::collections::HashMap;
1616
use std::future::Future;
1717
use std::io::Write;
1818
use std::path::PathBuf;
19-
use std::sync::LazyLock;
19+
use std::sync::{Arc, LazyLock};
2020
use std::time::Duration;
2121

2222
use anyhow::{bail, Result};
2323
use clap::Parser;
24+
use futures::channel::{mpsc, oneshot};
2425
use futures::future::join_all;
26+
use futures::{SinkExt, StreamExt};
2527
use madsim::net::ipvs::*;
2628
use madsim::runtime::{Handle, NodeHandle};
29+
use madsim::task::JoinHandle;
2730
use rand::Rng;
2831
use sqllogictest::AsyncDB;
2932

@@ -308,35 +311,47 @@ impl Cluster {
308311
})
309312
}
310313

311-
/// Run a SQL query from the client.
312-
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
313-
let sql = sql.into();
314+
/// Start a SQL session on the client node.
315+
pub fn start_session(&mut self) -> Session {
316+
let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
317+
318+
self.client.spawn(async move {
319+
let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
314320

315-
let result = self
316-
.client
317-
.spawn(async move {
318-
// TODO: reuse session
319-
let mut session = RisingWave::connect("frontend".into(), "dev".into())
321+
while let Some((sql, tx)) = query_rx.next().await {
322+
let result = client
323+
.run(&sql)
320324
.await
321-
.expect("failed to connect to RisingWave");
322-
let result = session.run(&sql).await?;
323-
Ok::<_, anyhow::Error>(result)
324-
})
325-
.await??;
326-
327-
match result {
328-
sqllogictest::DBOutput::Rows { rows, .. } => Ok(rows
329-
.into_iter()
330-
.map(|row| {
331-
row.into_iter()
332-
.map(|v| v.to_string())
333-
.collect::<Vec<_>>()
334-
.join(" ")
335-
})
336-
.collect::<Vec<_>>()
337-
.join("\n")),
338-
_ => Ok("".to_string()),
339-
}
325+
.map(|output| match output {
326+
sqllogictest::DBOutput::Rows { rows, .. } => rows
327+
.into_iter()
328+
.map(|row| {
329+
row.into_iter()
330+
.map(|v| v.to_string())
331+
.collect::<Vec<_>>()
332+
.join(" ")
333+
})
334+
.collect::<Vec<_>>()
335+
.join("\n"),
336+
_ => "".to_string(),
337+
})
338+
.map_err(Into::into);
339+
340+
let _ = tx.send(result);
341+
}
342+
343+
Ok::<_, anyhow::Error>(())
344+
});
345+
346+
Session { query_tx }
347+
}
348+
349+
/// Run a SQL query on a **new** session of the client node.
350+
///
351+
/// This is a convenience method that creates a new session and runs the query on it. If you
352+
/// want to run multiple queries on the same session, use `start_session` and `Session::run`.
353+
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
354+
self.start_session().run(sql).await
340355
}
341356

342357
/// Run a future on the client node.
@@ -517,6 +532,26 @@ impl Cluster {
517532
}
518533
}
519534

535+
type SessionRequest = (
536+
String, // query sql
537+
oneshot::Sender<Result<String>>, // channel to send result back
538+
);
539+
540+
/// A SQL session on the simulated client node.
541+
#[derive(Debug, Clone)]
542+
pub struct Session {
543+
query_tx: mpsc::Sender<SessionRequest>,
544+
}
545+
546+
impl Session {
547+
/// Run the given SQL query on the session.
548+
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
549+
let (tx, rx) = oneshot::channel();
550+
self.query_tx.send((sql.into(), tx)).await?;
551+
rx.await?
552+
}
553+
}
554+
520555
/// Options for killing nodes.
521556
#[derive(Debug, Clone, Copy, PartialEq)]
522557
pub struct KillOpts {

src/tests/simulation/tests/it/cascade_materialized_view.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ const MV5: &str = "create materialized view m5 as select * from m4;";
3333
#[madsim::test]
3434
async fn test_simple_cascade_materialized_view() -> Result<()> {
3535
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
36+
let mut session = cluster.start_session();
3637

37-
cluster.run(ROOT_TABLE_CREATE).await?;
38-
cluster.run(MV1).await?;
38+
session.run(ROOT_TABLE_CREATE).await?;
39+
session.run(MV1).await?;
3940

4041
let fragment = cluster
4142
.locate_one_fragment([
@@ -62,17 +63,17 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
6263
fragment.inner.actors.len()
6364
);
6465

65-
cluster
66+
session
6667
.run(&format!(
6768
"insert into t1 values {}",
6869
(1..=10).map(|x| format!("({x})")).join(",")
6970
))
7071
.await?;
7172

72-
cluster.run("flush").await?;
73+
session.run("flush").await?;
7374

7475
// v1 > 5, result is [6, 7, 8, 9, 10]
75-
cluster
76+
session
7677
.run("select count(*) from m1")
7778
.await?
7879
.assert_result_eq("5");
@@ -92,21 +93,21 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
9293
fragment.inner.actors.len()
9394
);
9495

95-
cluster
96+
session
9697
.run("select count(*) from m1")
9798
.await?
9899
.assert_result_eq("5");
99100

100-
cluster
101+
session
101102
.run(&format!(
102103
"insert into t1 values {}",
103104
(11..=20).map(|x| format!("({x})")).join(",")
104105
))
105106
.await?;
106107

107-
cluster.run("flush").await?;
108+
session.run("flush").await?;
108109
// 10 < v1 < 15, result is [11, 12, 13, 14]
109-
cluster
110+
session
110111
.run("select count(*) from m1")
111112
.await?
112113
.assert_result_eq("15");
@@ -117,13 +118,14 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
117118
#[madsim::test]
118119
async fn test_diamond_cascade_materialized_view() -> Result<()> {
119120
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
121+
let mut session = cluster.start_session();
120122

121-
cluster.run(ROOT_TABLE_CREATE).await?;
122-
cluster.run(MV1).await?;
123-
cluster.run(MV2).await?;
124-
cluster.run(MV3).await?;
125-
cluster.run(MV4).await?;
126-
cluster.run(MV5).await?;
123+
session.run(ROOT_TABLE_CREATE).await?;
124+
session.run(MV1).await?;
125+
session.run(MV2).await?;
126+
session.run(MV3).await?;
127+
session.run(MV4).await?;
128+
session.run(MV5).await?;
127129

128130
let fragment = cluster
129131
.locate_one_fragment([
@@ -141,15 +143,15 @@ async fn test_diamond_cascade_materialized_view() -> Result<()> {
141143
let fragment = cluster.locate_fragment_by_id(id).await?;
142144
assert_eq!(fragment.inner.actors.len(), 1);
143145

144-
cluster
146+
session
145147
.run(&format!(
146148
"insert into t1 values {}",
147149
(1..=10).map(|x| format!("({x})")).join(",")
148150
))
149151
.await?;
150152

151-
cluster.run("flush").await?;
152-
cluster
153+
session.run("flush").await?;
154+
session
153155
.run("select count(*) from m5")
154156
.await?
155157
.assert_result_eq("0");
@@ -160,20 +162,20 @@ async fn test_diamond_cascade_materialized_view() -> Result<()> {
160162
let fragment = cluster.locate_fragment_by_id(id).await?;
161163
assert_eq!(fragment.inner.actors.len(), 6);
162164

163-
cluster
165+
session
164166
.run("select count(*) from m5")
165167
.await?
166168
.assert_result_eq("0");
167169

168-
cluster
170+
session
169171
.run(&format!(
170172
"insert into t1 values {}",
171173
(11..=20).map(|x| format!("({x})")).join(",")
172174
))
173175
.await?;
174176

175-
cluster.run("flush").await?;
176-
cluster
177+
session.run("flush").await?;
178+
session
177179
.run("select count(*) from m5")
178180
.await?
179181
.assert_result_eq("4");

src/tests/simulation/tests/it/dynamic_filter.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ const SELECT: &str = "select * from mv1 order by v1;";
2828
#[madsim::test]
2929
async fn test_dynamic_filter() -> Result<()> {
3030
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
31+
let mut session = cluster.start_session();
3132

32-
cluster.run("create table t1 (v1 int);").await?;
33-
cluster.run("create table t2 (v2 int);").await?;
34-
cluster.run("create materialized view mv1 as with max_v2 as (select max(v2) max from t2) select v1 from t1, max_v2 where v1 > max;").await?;
35-
cluster.run("insert into t1 values (1), (2), (3)").await?;
36-
cluster.run("flush").await?;
33+
session.run("create table t1 (v1 int);").await?;
34+
session.run("create table t2 (v2 int);").await?;
35+
session.run("create materialized view mv1 as with max_v2 as (select max(v2) max from t2) select v1 from t1, max_v2 where v1 > max;").await?;
36+
session.run("insert into t1 values (1), (2), (3)").await?;
37+
session.run("flush").await?;
3738
sleep(Duration::from_secs(5)).await;
3839

3940
let dynamic_filter_fragment = cluster
@@ -60,53 +61,53 @@ async fn test_dynamic_filter() -> Result<()> {
6061
cluster.reschedule(format!("{id}-[1,2,3]")).await?;
6162
sleep(Duration::from_secs(3)).await;
6263

63-
cluster.run(SELECT).await?.assert_result_eq("");
64-
cluster.run("insert into t2 values (0)").await?;
65-
cluster.run("flush").await?;
64+
session.run(SELECT).await?.assert_result_eq("");
65+
session.run("insert into t2 values (0)").await?;
66+
session.run("flush").await?;
6667
sleep(Duration::from_secs(5)).await;
67-
cluster.run(SELECT).await?.assert_result_eq("1\n2\n3");
68+
session.run(SELECT).await?.assert_result_eq("1\n2\n3");
6869
// 1
6970
// 2
7071
// 3
7172

7273
cluster.reschedule(format!("{id}-[4,5]+[1,2,3]")).await?;
7374
sleep(Duration::from_secs(3)).await;
74-
cluster.run(SELECT).await?.assert_result_eq("1\n2\n3");
75+
session.run(SELECT).await?.assert_result_eq("1\n2\n3");
7576

76-
cluster.run("insert into t2 values (2)").await?;
77-
cluster.run("flush").await?;
77+
session.run("insert into t2 values (2)").await?;
78+
session.run("flush").await?;
7879
sleep(Duration::from_secs(5)).await;
79-
cluster.run(SELECT).await?.assert_result_eq("3");
80+
session.run(SELECT).await?.assert_result_eq("3");
8081
// 3
8182

8283
cluster.reschedule(format!("{id}-[1,2,3]+[4,5]")).await?;
8384
sleep(Duration::from_secs(3)).await;
84-
cluster.run(SELECT).await?.assert_result_eq("3");
85+
session.run(SELECT).await?.assert_result_eq("3");
8586

86-
cluster.run("update t2 set v2 = 1 where v2 = 2").await?;
87-
cluster.run("flush").await?;
87+
session.run("update t2 set v2 = 1 where v2 = 2").await?;
88+
session.run("flush").await?;
8889
sleep(Duration::from_secs(5)).await;
89-
cluster.run(SELECT).await?.assert_result_eq("2\n3");
90+
session.run(SELECT).await?.assert_result_eq("2\n3");
9091
// 2
9192
// 3
9293
//
9394
cluster.reschedule(format!("{id}+[1,2,3]")).await?;
9495
sleep(Duration::from_secs(3)).await;
95-
cluster.run(SELECT).await?.assert_result_eq("2\n3");
96+
session.run(SELECT).await?.assert_result_eq("2\n3");
9697

97-
cluster.run("delete from t2 where true").await?;
98-
cluster.run("flush").await?;
98+
session.run("delete from t2 where true").await?;
99+
session.run("flush").await?;
99100
sleep(Duration::from_secs(5)).await;
100-
cluster.run(SELECT).await?.assert_result_eq("");
101+
session.run(SELECT).await?.assert_result_eq("");
101102

102103
cluster.reschedule(format!("{id}-[1]")).await?;
103104
sleep(Duration::from_secs(3)).await;
104-
cluster.run(SELECT).await?.assert_result_eq("");
105+
session.run(SELECT).await?.assert_result_eq("");
105106

106-
cluster.run("insert into t2 values (1)").await?;
107-
cluster.run("flush").await?;
107+
session.run("insert into t2 values (1)").await?;
108+
session.run("flush").await?;
108109
sleep(Duration::from_secs(5)).await;
109-
cluster.run(SELECT).await?.assert_result_eq("2\n3");
110+
session.run(SELECT).await?.assert_result_eq("2\n3");
110111

111112
Ok(())
112113
}

src/tests/simulation/tests/it/nexmark_chaos.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,17 @@ async fn nexmark_chaos_common_inner(
4141
) -> Result<()> {
4242
let mut cluster =
4343
NexmarkCluster::new(Configuration::for_scale(), 6, Some(20 * THROUGHPUT), false).await?;
44-
cluster.run(create).await?;
44+
let mut session = cluster.start_session();
45+
session.run(create).await?;
4546
sleep(Duration::from_secs(30)).await;
46-
let final_result = cluster.run(select).await?;
47-
cluster.run(drop).await?;
47+
let final_result = session.run(select).await?;
48+
session.run(drop).await?;
4849
sleep(Duration::from_secs(5)).await;
4950

5051
println!("Reference run done.");
51-
52-
cluster.run(create).await?;
52+
// Create a new session for the chaos run.
53+
let mut session = cluster.start_session();
54+
session.run(create).await?;
5355

5456
let _initial_result = cluster
5557
.wait_until_non_empty(select, initial_interval, initial_timeout)
@@ -68,7 +70,7 @@ async fn nexmark_chaos_common_inner(
6870
cluster.reschedule(join_plans(fragments)).await?;
6971

7072
sleep(after_scale_duration).await;
71-
cluster.run(select).await?.assert_result_ne(&final_result);
73+
session.run(select).await?.assert_result_ne(&final_result);
7274

7375
let fragments = cluster.locate_random_fragments().await?;
7476
cluster.reschedule(join_plans(fragments)).await?;
@@ -78,15 +80,15 @@ async fn nexmark_chaos_common_inner(
7880
cluster.reschedule(fragment.random_reschedule()).await?;
7981

8082
sleep(after_scale_duration).await;
81-
cluster.run(select).await?.assert_result_ne(&final_result);
83+
session.run(select).await?.assert_result_ne(&final_result);
8284

8385
let fragment = cluster.locate_fragment_by_id(id).await?;
8486
cluster.reschedule(fragment.random_reschedule()).await?;
8587
}
8688

8789
sleep(Duration::from_secs(50)).await;
8890

89-
cluster.run(select).await?.assert_result_eq(&final_result);
91+
session.run(select).await?.assert_result_eq(&final_result);
9092

9193
Ok(())
9294
}

0 commit comments

Comments
 (0)