Skip to content

Commit 999b9d2

Browse files
wugouzitabVersion
andauthored
feat: auto rebuild source reader in executor (#11993)
Co-authored-by: Bohan Zhang <[email protected]>
1 parent fa8b5e3 commit 999b9d2

File tree

7 files changed

+247
-132
lines changed

7 files changed

+247
-132
lines changed

docker/dashboards/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

grafana/risingwave-dev-dashboard.dashboard.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1443,6 +1443,16 @@ def section_streaming_errors(outer_panels):
14431443
),
14441444
],
14451445
),
1446+
panels.timeseries_count(
1447+
"Source Reader Errors by Type",
1448+
"",
1449+
[
1450+
panels.target(
1451+
f"sum({metric('user_source_reader_error_count')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
1452+
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
1453+
),
1454+
],
1455+
),
14461456
],
14471457
),
14481458
]

grafana/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

src/connector/src/parser/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,8 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
517517

518518
Err(error) => {
519519
tracing::warn!(%error, "message parsing failed, skipping");
520-
// This will throw an error for batch
521-
parser.source_ctx().report_user_source_error(error)?;
520+
// Skip for batch
521+
parser.source_ctx().report_user_source_error(error);
522522
continue;
523523
}
524524
}

src/connector/src/source/base.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use itertools::Itertools;
2525
use parking_lot::Mutex;
2626
use risingwave_common::array::StreamChunk;
2727
use risingwave_common::catalog::TableId;
28-
use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError};
28+
use risingwave_common::error::{ErrorCode, ErrorSuppressor, RwError};
2929
use risingwave_common::types::{JsonbVal, Scalar};
3030
use risingwave_pb::connector_service::PbTableSchema;
3131
use risingwave_pb::source::ConnectorSplit;
@@ -172,10 +172,9 @@ impl SourceContext {
172172
ctx
173173
}
174174

175-
pub(crate) fn report_user_source_error(&self, e: RwError) -> RwResult<()> {
176-
// Repropagate the error if batch
175+
pub(crate) fn report_user_source_error(&self, e: RwError) {
177176
if self.source_info.fragment_id == u32::MAX {
178-
return Err(e);
177+
return;
179178
}
180179
let mut err_str = e.inner().to_string();
181180
if let Some(suppressor) = &self.error_suppressor
@@ -198,7 +197,6 @@ impl SourceContext {
198197
&self.source_info.source_id.table_id.to_string(),
199198
])
200199
.inc();
201-
Ok(())
202200
}
203201
}
204202

src/stream/src/executor/monitor/streaming_stats.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ pub struct StreamingMetrics {
121121
/// User compute error reporting
122122
pub user_compute_error_count: GenericCounterVec<AtomicU64>,
123123

124+
/// User source reader error
125+
pub user_source_reader_error_count: GenericCounterVec<AtomicU64>,
126+
124127
// Materialize
125128
pub materialize_cache_hit_count: GenericCounterVec<AtomicU64>,
126129
pub materialize_cache_total_count: GenericCounterVec<AtomicU64>,
@@ -669,6 +672,20 @@ impl StreamingMetrics {
669672
)
670673
.unwrap();
671674

675+
let user_source_reader_error_count = register_int_counter_vec_with_registry!(
676+
"user_source_reader_error_count",
677+
"Source reader error count",
678+
&[
679+
"error_type",
680+
"error_msg",
681+
"executor_name",
682+
"actor_id",
683+
"source_id"
684+
],
685+
registry,
686+
)
687+
.unwrap();
688+
672689
let materialize_cache_hit_count = register_int_counter_vec_with_registry!(
673690
"stream_materialize_cache_hit_count",
674691
"Materialize executor cache hit count",
@@ -761,6 +778,7 @@ impl StreamingMetrics {
761778
jemalloc_allocated_bytes,
762779
jemalloc_active_bytes,
763780
user_compute_error_count,
781+
user_source_reader_error_count,
764782
materialize_cache_hit_count,
765783
materialize_cache_total_count,
766784
stream_memory_usage,

0 commit comments

Comments
 (0)