Skip to content

Commit 24fe1e8

Browse files
fix(batch, source): Propagate user errors (risingwavelabs#8493)
Signed-off-by: Runji Wang <[email protected]> Co-authored-by: jon-chuang <[email protected]> Co-authored-by: Runji Wang <[email protected]>
1 parent 81b4d59 commit 24fe1e8

File tree

3 files changed

+12
-6
lines changed

3 files changed

+12
-6
lines changed

src/connector/src/macros.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,9 @@ macro_rules! impl_common_parser_logic {
182182
if let Err(e) = self.parse_inner(content.as_ref(), builder.row_writer())
183183
.await
184184
{
185-
self.source_ctx.report_stream_source_error(&e);
186185
tracing::warn!("message parsing failed {}, skipping", e.to_string());
186+
// This will throw an error for batch
187+
self.source_ctx.report_user_source_error(e)?;
187188
continue;
188189
}
189190

src/connector/src/source/base.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use parking_lot::Mutex;
2525
use prost::Message;
2626
use risingwave_common::array::StreamChunk;
2727
use risingwave_common::catalog::TableId;
28-
use risingwave_common::error::{ErrorCode, ErrorSuppressor, RwError};
28+
use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError};
2929
use risingwave_pb::connector_service::TableSchema;
3030
use risingwave_pb::source::ConnectorSplit;
3131
use serde::{Deserialize, Serialize};
@@ -107,10 +107,10 @@ impl SourceContext {
107107
self.error_suppressor = Some(error_suppressor)
108108
}
109109

110-
pub(crate) fn report_stream_source_error(&self, e: &RwError) {
111-
// Do not report for batch
110+
pub(crate) fn report_user_source_error(&self, e: RwError) -> RwResult<()> {
111+
// Repropagate the error if batch
112112
if self.source_info.fragment_id == u32::MAX {
113-
return;
113+
return Err(e);
114114
}
115115
let mut err_str = e.inner().to_string();
116116
if let Some(suppressor) = &self.error_suppressor &&
@@ -130,6 +130,7 @@ impl SourceContext {
130130
&self.source_info.source_id.table_id.to_string(),
131131
])
132132
.inc();
133+
Ok(())
133134
}
134135
}
135136

src/tests/simulation/src/kafka.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ pub async fn producer(broker_addr: &str, datadir: String) {
8282
// binary message data, a file is a message
8383
Box::new(std::iter::once(content.as_slice()))
8484
} else {
85-
Box::new(content.split(|&b| b == b'\n'))
85+
Box::new(
86+
content
87+
.split(|&b| b == b'\n')
88+
.filter(|line| !line.is_empty()),
89+
)
8690
};
8791
for msg in msgs {
8892
loop {

0 commit comments

Comments
 (0)