Skip to content

Commit bba43ba

Browse files
authored
feat: make gen timestamp deterministic (risingwavelabs#8619)
Signed-off-by: tabVersion <[email protected]>
1 parent 18863e0 commit bba43ba

File tree

3 files changed

+44
-9
lines changed

3 files changed

+44
-9
lines changed

src/common/src/field_generator/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod varchar;
1919
use std::time::Duration;
2020

2121
use anyhow::Result;
22+
use chrono::{DateTime, FixedOffset};
2223
pub use numeric::*;
2324
use serde_json::Value;
2425
pub use timestamp::*;
@@ -155,11 +156,13 @@ impl FieldGeneratorImpl {
155156
}
156157

157158
pub fn with_timestamp(
159+
base: Option<DateTime<FixedOffset>>,
158160
max_past: Option<String>,
159161
max_past_mode: Option<String>,
160162
seed: u64,
161163
) -> Result<Self> {
162164
Ok(FieldGeneratorImpl::Timestamp(TimestampField::new(
165+
base,
163166
max_past,
164167
max_past_mode,
165168
seed,
@@ -293,7 +296,7 @@ mod tests {
293296
let mut generator = match data_type {
294297
DataType::Varchar => FieldGeneratorImpl::with_varchar(None, seed).unwrap(),
295298
DataType::Timestamp => {
296-
FieldGeneratorImpl::with_timestamp(None, None, seed).unwrap()
299+
FieldGeneratorImpl::with_timestamp(None, None, None, seed).unwrap()
297300
}
298301
_ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
299302
};
@@ -321,4 +324,16 @@ mod tests {
321324
assert_eq!(datum2_new, datum2);
322325
}
323326
}
327+
328+
#[test]
329+
fn test_deterministic_timestamp() {
330+
let seed = 1234;
331+
let base_time: DateTime<FixedOffset> =
332+
DateTime::parse_from_rfc3339("2020-01-01T00:00:00+00:00").unwrap();
333+
let mut generator =
334+
FieldGeneratorImpl::with_timestamp(Some(base_time), None, None, seed).unwrap();
335+
let val1 = generator.generate_json(1);
336+
let val2 = generator.generate_json(1);
337+
assert_eq!(val1, val2);
338+
}
324339
}

src/common/src/field_generator/timestamp.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ enum LocalNow {
3131
}
3232

3333
pub struct TimestampField {
34+
base: Option<DateTime<FixedOffset>>,
3435
max_past: Duration,
3536
local_now: LocalNow,
3637
seed: u64,
3738
}
3839

3940
impl TimestampField {
4041
pub fn new(
42+
base: Option<DateTime<FixedOffset>>,
4143
max_past_option: Option<String>,
4244
max_past_mode: Option<String>,
4345
seed: u64,
@@ -61,6 +63,7 @@ impl TimestampField {
6163
};
6264
debug!(?local_now, ?max_past, "parse timestamp field option");
6365
Ok(Self {
66+
base,
6467
// convert to chrono::Duration
6568
max_past: chrono::Duration::from_std(max_past)?,
6669
local_now,
@@ -72,12 +75,15 @@ impl TimestampField {
7275
let milliseconds = self.max_past.num_milliseconds();
7376
let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
7477
let max_milliseconds = rng.gen_range(0..=milliseconds);
75-
let now = match self.local_now {
76-
LocalNow::Relative => Local::now()
77-
.naive_local()
78-
.duration_round(Duration::microseconds(1))
79-
.unwrap(),
80-
LocalNow::Absolute(now) => now,
78+
let now = match self.base {
79+
Some(base) => base.naive_local(),
80+
None => match self.local_now {
81+
LocalNow::Relative => Local::now()
82+
.naive_local()
83+
.duration_round(Duration::microseconds(1))
84+
.unwrap(),
85+
LocalNow::Absolute(now) => now,
86+
},
8187
};
8288
now - Duration::milliseconds(max_milliseconds)
8389
}

src/connector/src/source/datagen/source/reader.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use std::collections::HashMap;
1616

17-
use anyhow::Result;
17+
use anyhow::{anyhow, Result};
1818
use async_trait::async_trait;
1919
use futures::{StreamExt, TryStreamExt};
2020
use futures_async_stream::try_stream;
@@ -204,8 +204,22 @@ fn generator_from_data_type(
204204
let max_past_mode_value = fields_option_map
205205
.get(&max_past_mode_key)
206206
.map(|s| s.to_lowercase());
207+
let basetime = match fields_option_map.get(format!("fields.{}.basetime", name).as_str())
208+
{
209+
Some(base) => {
210+
Some(chrono::DateTime::parse_from_rfc3339(base).map_err(|e| {
211+
anyhow!("cannot parse {:?} to rfc3339 due to {:?}", base, e)
212+
})?)
213+
}
214+
None => None,
215+
};
207216

208-
FieldGeneratorImpl::with_timestamp(max_past_value, max_past_mode_value, random_seed)
217+
FieldGeneratorImpl::with_timestamp(
218+
basetime,
219+
max_past_value,
220+
max_past_mode_value,
221+
random_seed,
222+
)
209223
}
210224
DataType::Varchar => {
211225
let length_key = format!("fields.{}.length", name);

0 commit comments

Comments
 (0)