Skip to content

Commit a1d084d

Browse files
authored
fix(streaming): also enable schema check in release profile (risingwavelabs#8711)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent ad61a71 commit a1d084d

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

src/stream/src/executor/wrapper.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ impl WrapperExecutor {
118118
let stream =
119119
trace::instrument_await_tree(info.clone(), extra.actor_id, extra.executor_id, stream);
120120

121+
// Schema check
122+
let stream = schema_check::schema_check(info.clone(), stream);
121123
// Epoch check
122124
let stream = epoch_check::epoch_check(info, stream);
123125

src/stream/src/executor/wrapper/schema_check.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,25 @@ pub async fn schema_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
2727
for message in input {
2828
let message = message?;
2929

30-
if let Message::Chunk(chunk) = &message {
31-
risingwave_common::util::schema_check::schema_check(
30+
match &message {
31+
Message::Chunk(chunk) => risingwave_common::util::schema_check::schema_check(
3232
info.schema.fields().iter().map(|f| &f.data_type),
3333
chunk.columns(),
34-
)
35-
.unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e));
34+
),
35+
Message::Watermark(watermark) => {
36+
let expected = info.schema.fields()[watermark.col_idx].data_type();
37+
let found = &watermark.data_type;
38+
if &expected != found {
39+
Err(format!(
40+
"watermark type mismatched: expected {expected}, found {found}"
41+
))
42+
} else {
43+
Ok(())
44+
}
45+
}
46+
Message::Barrier(_) => Ok(()),
3647
}
48+
.unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e));
3749

3850
yield message;
3951
}

0 commit comments

Comments
 (0)