Skip to content

Commit 24ecd4d

Browse files
authored
chore: fix unstable unit test (risingwavelabs#8674)
Signed-off-by: tabVersion <[email protected]>
1 parent d967fcc commit 24ecd4d

File tree

8 files changed

+89
-41
lines changed

8 files changed

+89
-41
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/batch/src/executor/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@ pub fn gen_sorted_data(
6666
batch_num: usize,
6767
start: String,
6868
step: u64,
69+
offset: u64,
6970
) -> Vec<DataChunk> {
7071
let mut data_gen = FieldGeneratorImpl::with_number_sequence(
7172
DataType::Int64,
7273
Some(start),
7374
Some(i64::MAX.to_string()),
7475
0,
7576
step,
77+
offset,
7678
)
7779
.unwrap();
7880
let mut ret = Vec::<DataChunk>::with_capacity(batch_num);

src/common/src/field_generator/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,13 @@ pub trait NumericFieldRandomGenerator {
5252

5353
/// fields that can be continuously generated impl this trait
5454
pub trait NumericFieldSequenceGenerator {
55-
fn new(start: Option<String>, end: Option<String>, offset: u64, step: u64) -> Result<Self>
55+
fn new(
56+
start: Option<String>,
57+
end: Option<String>,
58+
offset: u64,
59+
step: u64,
60+
event_offset: u64,
61+
) -> Result<Self>
5662
where
5763
Self: Sized;
5864

@@ -93,37 +99,43 @@ impl FieldGeneratorImpl {
9399
end: Option<String>,
94100
split_index: u64,
95101
split_num: u64,
102+
offset: u64,
96103
) -> Result<Self> {
97104
match data_type {
98105
DataType::Int16 => Ok(FieldGeneratorImpl::I16Sequence(I16SequenceField::new(
99106
start,
100107
end,
101108
split_index,
102109
split_num,
110+
offset,
103111
)?)),
104112
DataType::Int32 => Ok(FieldGeneratorImpl::I32Sequence(I32SequenceField::new(
105113
start,
106114
end,
107115
split_index,
108116
split_num,
117+
offset,
109118
)?)),
110119
DataType::Int64 => Ok(FieldGeneratorImpl::I64Sequence(I64SequenceField::new(
111120
start,
112121
end,
113122
split_index,
114123
split_num,
124+
offset,
115125
)?)),
116126
DataType::Float32 => Ok(FieldGeneratorImpl::F32Sequence(F32SequenceField::new(
117127
start,
118128
end,
119129
split_index,
120130
split_num,
131+
offset,
121132
)?)),
122133
DataType::Float64 => Ok(FieldGeneratorImpl::F64Sequence(F64SequenceField::new(
123134
start,
124135
end,
125136
split_index,
126137
split_num,
138+
offset,
127139
)?)),
128140
_ => unimplemented!(),
129141
}
@@ -265,6 +277,7 @@ mod tests {
265277
Some("20".to_string()),
266278
split_index,
267279
split_num,
280+
0,
268281
)
269282
.unwrap(),
270283
);

src/common/src/field_generator/numeric.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ where
107107
end_option: Option<String>,
108108
offset: u64,
109109
step: u64,
110+
event_offset: u64,
110111
) -> Result<Self>
111112
where
112113
Self: Sized,
@@ -127,7 +128,9 @@ where
127128
end,
128129
offset,
129130
step,
130-
..Default::default()
131+
cur: T::from(event_offset).ok_or_else(|| {
132+
anyhow::anyhow!("event offset is too big, offset: {}", event_offset,)
133+
})?,
131134
})
132135
}
133136

@@ -194,7 +197,7 @@ mod tests {
194197
#[test]
195198
fn test_sequence_field_generator() {
196199
let mut i16_field =
197-
I16SequenceField::new(Some("5".to_string()), Some("10".to_string()), 0, 1).unwrap();
200+
I16SequenceField::new(Some("5".to_string()), Some("10".to_string()), 0, 1, 0).unwrap();
198201
for i in 5..=10 {
199202
assert_eq!(i16_field.generate(), json!(i));
200203
}
@@ -222,7 +225,8 @@ mod tests {
222225
#[test]
223226
fn test_sequence_datum_generator() {
224227
let mut f32_field =
225-
F32SequenceField::new(Some("5.0".to_string()), Some("10.0".to_string()), 0, 1).unwrap();
228+
F32SequenceField::new(Some("5.0".to_string()), Some("10.0".to_string()), 0, 1, 0)
229+
.unwrap();
226230

227231
for i in 5..=10 {
228232
assert_eq!(
@@ -247,13 +251,13 @@ mod tests {
247251
#[test]
248252
fn test_sequence_field_generator_float() {
249253
let mut f64_field =
250-
F64SequenceField::new(Some("0".to_string()), Some("10".to_string()), 0, 1).unwrap();
254+
F64SequenceField::new(Some("0".to_string()), Some("10".to_string()), 0, 1, 0).unwrap();
251255
for i in 0..=10 {
252256
assert_eq!(f64_field.generate(), json!(i as f64));
253257
}
254258

255259
let mut f32_field =
256-
F32SequenceField::new(Some("-5".to_string()), Some("5".to_string()), 0, 1).unwrap();
260+
F32SequenceField::new(Some("-5".to_string()), Some("5".to_string()), 0, 1, 0).unwrap();
257261
for i in -5..=5 {
258262
assert_eq!(f32_field.generate(), json!(i as f32));
259263
}

src/connector/src/source/datagen/source/generator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ mod tests {
241241
Some(end.to_string()),
242242
split_index,
243243
split_num,
244+
0,
244245
)
245246
.unwrap(),
246247
),
@@ -251,6 +252,7 @@ mod tests {
251252
Some(end.to_string()),
252253
split_index,
253254
split_num,
255+
0,
254256
)
255257
.unwrap(),
256258
),

src/connector/src/source/datagen/source/reader.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl SplitReader for DatagenSplitReader {
5959
let mut events_so_far = u64::default();
6060
tracing::debug!("Splits for datagen found! {:?}", splits);
6161

62-
assert!(splits.len() == 1);
62+
debug_assert!(splits.len() == 1);
6363
let split = splits.into_iter().next().unwrap();
6464
// TODO: currently, assume there's only on split in one reader
6565
let split_id = split.id();
@@ -114,6 +114,7 @@ impl SplitReader for DatagenSplitReader {
114114
&column.name,
115115
split_index,
116116
split_num,
117+
events_so_far,
117118
)?)
118119
} else {
119120
FieldDesc::Invisible
@@ -172,6 +173,7 @@ fn generator_from_data_type(
172173
name: &String,
173174
split_index: u64,
174175
split_num: u64,
176+
offset: u64,
175177
) -> Result<FieldGeneratorImpl> {
176178
let random_seed_key = format!("fields.{}.seed", name);
177179
let random_seed: u64 = match fields_option_map
@@ -236,6 +238,7 @@ fn generator_from_data_type(
236238
&format!("{}.{}", name, field_name),
237239
split_index,
238240
split_num,
241+
offset,
239242
)?;
240243
Ok((field_name, gen))
241244
})
@@ -251,6 +254,7 @@ fn generator_from_data_type(
251254
&format!("{}._", name),
252255
split_index,
253256
split_num,
257+
offset,
254258
)?;
255259
FieldGeneratorImpl::with_list(generator, length_value)
256260
}
@@ -267,7 +271,8 @@ fn generator_from_data_type(
267271
start_value,
268272
end_value,
269273
split_index,
270-
split_num
274+
split_num,
275+
offset,
271276
)
272277
} else {
273278
let min_key = format!("fields.{}.min", name);

src/stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,4 @@ workspace-hack = { path = "../workspace-hack" }
7777
[dev-dependencies]
7878
assert_matches = "1"
7979
risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] }
80+
tracing-test = "0.2"

src/stream/src/executor/source/source_executor.rs

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -488,21 +488,21 @@ impl<S: StateStore> Debug for SourceExecutor<S> {
488488

489489
#[cfg(test)]
490490
mod tests {
491-
use std::sync::atomic::AtomicU64;
491+
492492
use std::time::Duration;
493493

494494
use maplit::{convert_args, hashmap};
495495
use risingwave_common::array::StreamChunk;
496-
use risingwave_common::catalog::{ColumnId, ConflictBehavior, Field, Schema, TableId};
496+
use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
497497
use risingwave_common::test_prelude::StreamChunkTestExt;
498498
use risingwave_common::types::DataType;
499-
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
500499
use risingwave_connector::source::datagen::DatagenSplit;
501500
use risingwave_pb::catalog::StreamSourceInfo;
502501
use risingwave_pb::plan_common::PbRowFormatType;
503502
use risingwave_source::connector_test_utils::create_source_desc_builder;
504503
use risingwave_storage::memory::MemoryStateStore;
505504
use tokio::sync::mpsc::unbounded_channel;
505+
use tracing_test::traced_test;
506506

507507
use super::*;
508508
use crate::executor::ActorContext;
@@ -600,6 +600,7 @@ mod tests {
600600
);
601601
}
602602

603+
#[traced_test]
603604
#[tokio::test]
604605
async fn test_split_change_mutation() {
605606
let table_id = TableId::default();
@@ -615,9 +616,9 @@ mod tests {
615616
};
616617
let properties = convert_args!(hashmap!(
617618
"connector" => "datagen",
618-
"fields.v1.min" => "1",
619-
"fields.v1.max" => "1000",
620-
"fields.v1.seed" => "12345",
619+
"fields.v1.kind" => "sequence",
620+
"fields.v1.start" => "11",
621+
"fields.v1.end" => "11111",
621622
));
622623

623624
let source_desc_builder = create_source_desc_builder(
@@ -658,20 +659,7 @@ mod tests {
658659
u64::MAX,
659660
1,
660661
);
661-
662-
let mut materialize = MaterializeExecutor::for_test(
663-
Box::new(executor),
664-
mem_state_store.clone(),
665-
TableId::from(0x2333),
666-
vec![ColumnOrder::new(0, OrderType::ascending())],
667-
column_ids,
668-
2,
669-
Arc::new(AtomicU64::new(0)),
670-
ConflictBehavior::NoCheck,
671-
)
672-
.await
673-
.boxed()
674-
.execute();
662+
let mut handler = Box::new(executor).execute();
675663

676664
let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add {
677665
adds: HashMap::new(),
@@ -687,11 +675,11 @@ mod tests {
687675
});
688676
barrier_tx.send(init_barrier).unwrap();
689677

690-
(materialize.next().await.unwrap().unwrap())
678+
(handler.next().await.unwrap().unwrap())
691679
.into_barrier()
692680
.unwrap();
693681

694-
let mut ready_chunks = materialize.ready_chunks(10);
682+
let mut ready_chunks = handler.ready_chunks(10);
695683
let chunks = (ready_chunks.next().await.unwrap())
696684
.into_iter()
697685
.map(|msg| msg.unwrap().into_chunk().unwrap())
@@ -701,10 +689,10 @@ mod tests {
701689
chunk_1,
702690
StreamChunk::from_pretty(
703691
" i
704-
+ 533
705-
+ 833
706-
+ 738
707-
+ 344",
692+
+ 11
693+
+ 14
694+
+ 17
695+
+ 20",
708696
)
709697
);
710698

@@ -719,6 +707,11 @@ mod tests {
719707
split_num: 3,
720708
start_offset: None,
721709
}),
710+
SplitImpl::Datagen(DatagenSplit {
711+
split_index: 2,
712+
split_num: 3,
713+
start_offset: None,
714+
}),
722715
];
723716

724717
let change_split_mutation =
@@ -751,18 +744,22 @@ mod tests {
751744
let chunk_2 = StreamChunk::concat(chunks).sort_rows();
752745
assert_eq!(
753746
chunk_2,
754-
// mixed from datagen split 0 and 1
747+
// mixed from datagen split 0, 1 and 2
755748
StreamChunk::from_pretty(
756749
" i
750+
+ 12
751+
+ 13
752+
+ 15
753+
+ 16
754+
+ 18
755+
+ 19
756+
+ 23
757+
+ 26
757758
+ 29
758-
+ 201
759-
+ 344
760-
+ 425
761-
+ 525
762-
+ 533
763-
+ 833",
759+
+ 32",
764760
)
765761
);
762+
tracing::debug!("chunk_2: {:?}", chunk_2.to_pretty_string());
766763

767764
let barrier = Barrier::new_test_barrier(3).with_mutation(Mutation::Pause);
768765
barrier_tx.send(barrier).unwrap();

0 commit comments

Comments
 (0)