Skip to content

Commit 3e8ce31

Browse files
chore: Add time for each slt record in simulation test (risingwavelabs#8724)
1 parent ea72ed7 commit 3e8ce31

File tree

6 files changed

+119
-3
lines changed

6 files changed

+119
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/tests/simulation/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ itertools = "0.10"
2323
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" }
2424
madsim = "0.2.17"
2525
paste = "1"
26+
pin-project = "1.0"
2627
pretty_assertions = "1"
2728
rand = "0.8"
2829
rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build"] }

src/tests/simulation/src/slt.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use sqllogictest::ParallelTestError;
2121

2222
use crate::client::RisingWave;
2323
use crate::cluster::{Cluster, KillOpts};
24+
use crate::utils::TimedExt;
2425

2526
fn is_create_table_as(sql: &str) -> bool {
2627
let parts: Vec<String> = sql
@@ -112,7 +113,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
112113

113114
// For normal records.
114115
if !kill {
115-
match tester.run_async(record).await {
116+
match tester
117+
.run_async(record.clone())
118+
.timed(|_res, elapsed| {
119+
println!("Record {:?} finished in {:?}", record, elapsed)
120+
})
121+
.await
122+
{
116123
Ok(_) => continue,
117124
Err(e) => panic!("{}", e),
118125
}
@@ -128,7 +135,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
128135
if cmd.ignore_kill() {
129136
for i in 0usize.. {
130137
let delay = Duration::from_secs(1 << i);
131-
if let Err(err) = tester.run_async(record.clone()).await {
138+
if let Err(err) = tester
139+
.run_async(record.clone())
140+
.timed(|_res, elapsed| {
141+
println!("Record {:?} finished in {:?}", record, elapsed)
142+
})
143+
.await
144+
{
132145
// cluster could be still under recovering if killed before, retry if
133146
// meets `no reader for dml in table with id {}`.
134147
let should_retry =
@@ -162,7 +175,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
162175
// retry up to 5 times until it succeed
163176
for i in 0usize.. {
164177
let delay = Duration::from_secs(1 << i);
165-
match tester.run_async(record.clone()).await {
178+
match tester
179+
.run_async(record.clone())
180+
.timed(|_res, elapsed| {
181+
println!("Record {:?} finished in {:?}", record, elapsed)
182+
})
183+
.await
184+
{
166185
Ok(_) => break,
167186
// allow 'table exists' error when retry CREATE statement
168187
Err(e)

src/tests/simulation/src/utils/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod assert_result;
16+
pub use assert_result::*;
17+
18+
mod timed_future;
19+
pub use timed_future::*;
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::future::Future;
16+
use std::pin::{pin, Pin};
17+
use std::task::{Context, Poll};
18+
use std::time::{Duration, Instant};
19+
20+
use pin_project::pin_project;
21+
22+
/// Inspired by https://stackoverflow.com/a/59935743/2990323
23+
/// A wrapper around a Future which adds timing data.
24+
#[pin_project]
25+
pub struct Timed<Fut, F>
26+
where
27+
Fut: Future,
28+
F: Fn(&Fut::Output, Duration),
29+
{
30+
#[pin]
31+
inner: Fut,
32+
f: F,
33+
start: Option<Instant>,
34+
}
35+
36+
impl<Fut, F> Future for Timed<Fut, F>
37+
where
38+
Fut: Future,
39+
F: Fn(&Fut::Output, Duration),
40+
{
41+
type Output = Fut::Output;
42+
43+
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
44+
let this = self.project();
45+
let start = this.start.get_or_insert_with(Instant::now);
46+
47+
match this.inner.poll(cx) {
48+
// If the inner future is still pending, this wrapper is still pending.
49+
Poll::Pending => Poll::Pending,
50+
51+
// If the inner future is done, measure the elapsed time and finish this wrapper future.
52+
Poll::Ready(v) => {
53+
let elapsed = start.elapsed();
54+
(this.f)(&v, elapsed);
55+
56+
Poll::Ready(v)
57+
}
58+
}
59+
}
60+
}
61+
62+
pub trait TimedExt: Sized + Future {
63+
fn timed<F>(self, f: F) -> Timed<Self, F>
64+
where
65+
F: Fn(&Self::Output, Duration),
66+
{
67+
Timed {
68+
inner: self,
69+
f,
70+
start: None,
71+
}
72+
}
73+
}
74+
75+
// All futures can use the `.timed` method defined above
76+
impl<F: Future> TimedExt for F {}

0 commit comments

Comments
 (0)