Skip to content

Commit 8fb2d3f

Browse files
fix(test): add timestamp to kafka messages in the simulation (#7778)
This PR includes the `kafka_batch.slt` simulation test in CI, and adds timestamp to kafka messages to make it pass. Approved-By: BugenZhao
1 parent dcc1ece commit 8fb2d3f

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

ci/scripts/deterministic-e2e-test.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, batch"
3030
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'
3131

3232
echo "--- deterministic simulation e2e, ci-3cn-2fe, kafka source"
33-
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'
33+
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'
3434

3535
echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming"
3636
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'

src/tests/simulation/src/kafka.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::time::SystemTime;
1617

1718
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
18-
use rdkafka::consumer::StreamConsumer;
1919
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
2020
use rdkafka::producer::{BaseProducer, BaseRecord};
2121
use rdkafka::ClientConfig;
@@ -78,10 +78,19 @@ pub async fn producer(broker_addr: &str, datadir: String) {
7878
.expect("failed to create topic");
7979

8080
let content = std::fs::read(file.path()).unwrap();
81-
// binary message data, a file is a message
82-
if topic.ends_with("bin") {
81+
let msgs: Box<dyn Iterator<Item = &[u8]> + Send> = if topic.ends_with("bin") {
82+
// binary message data, a file is a message
83+
Box::new(std::iter::once(content.as_slice()))
84+
} else {
85+
Box::new(content.split(|&b| b == b'\n'))
86+
};
87+
for msg in msgs {
8388
loop {
84-
let record = BaseRecord::<(), _>::to(topic).payload(&content);
89+
let ts = SystemTime::now()
90+
.duration_since(SystemTime::UNIX_EPOCH)
91+
.unwrap()
92+
.as_millis() as i64;
93+
let record = BaseRecord::<(), _>::to(topic).payload(msg).timestamp(ts);
8594
match producer.send(record) {
8695
Ok(_) => break,
8796
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
@@ -90,19 +99,6 @@ pub async fn producer(broker_addr: &str, datadir: String) {
9099
Err((e, _)) => panic!("failed to send message: {}", e),
91100
}
92101
}
93-
} else {
94-
for line in content.split(|&b| b == b'\n') {
95-
loop {
96-
let record = BaseRecord::<(), _>::to(topic).payload(line);
97-
match producer.send(record) {
98-
Ok(_) => break,
99-
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
100-
producer.flush(None).await.expect("failed to flush");
101-
}
102-
Err((e, _)) => panic!("failed to send message: {}", e),
103-
}
104-
}
105-
}
106102
}
107103
producer.flush(None).await.expect("failed to flush");
108104
}

0 commit comments

Comments
 (0)