-
Notifications
You must be signed in to change notification settings - Fork 664
Description
Describe the bug
If the connector source throws an error up to the source executor, the consumption of the source will stop forever:
risingwave/src/stream/src/executor/source/reader.rs
Lines 61 to 62 in 6f39f43
error!("hang up stream reader due to polling error: {}", err); | |
futures::future::pending().stack_trace("source_error").await |
2023-01-04T11:56:46.373988Z INFO risingwave_connector::source::kafka::source::reader:135: kafka message error: Err(KafkaError (Partition EOF: 11))
2023-01-04T11:56:46.375555Z INFO risingwave_connector::source::kafka::source::reader:135: kafka message error: Err(KafkaError (Partition EOF: 11))
2023-01-04T11:56:46.376565Z ERROR risingwave_stream::executor::source::reader:61: hang up stream reader due to polling error: internal error: Partition EOF: 11
2023-01-04T11:56:46.377123Z ERROR risingwave_stream::executor::source::reader:61: hang up stream reader due to polling error: internal error: Partition EOF: 11
But the barrier messages can still pass to downstream executors:
risingwave/src/stream/src/executor/source/reader.rs
Lines 90 to 95 in 98ae6ce
select_with_strategy( | |
barrier_receiver_arm, | |
source_stream_arm, | |
// We prefer barrier on the left hand side over source chunks. | |
|_: &mut ()| PollNext::Left, | |
) |
I am not sure whether this problem is a by-design feature.
To Reproduce
Manually construct an error in the KafkaSplitReader
and propagate it to the upper layer.
diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs
index ab993dfde..e24706f76 100644
--- a/src/connector/src/source/kafka/source/reader.rs
+++ b/src/connector/src/source/kafka/source/reader.rs
@@ -21,6 +21,7 @@ use futures::StreamExt;
use futures_async_stream::try_stream;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
+use rdkafka::error::KafkaError;
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use crate::source::base::{SourceMessage, SplitReader, MAX_CHUNK_SIZE};
@@ -129,6 +130,8 @@ impl KafkaSplitReader {
#[for_await]
'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) {
for msg in msgs {
+ let fake_msg = Err(KafkaError::PartitionEOF(11));
+ let my_msg = fake_msg?;
let msg = msg?;
let cur_offset = msg.offset();
bytes_current_second += match &msg.payload() {
Then start the cluster with the full
config and run the tpch-bench script to ingest data.
Expected behavior
IIUC, when executors encounter an error in other scenarios, the error will report to the Meta via the barrier collection mechanism (#6319). Then Meta will enter the recovery process to recover the whole streaming graph.
But for the scenario of connector source failure, the upstream system may become available after a while or the failure is unrecoverable indeed. So it may be a waste to let Meta recover the cluster blindly:
risingwave/src/meta/src/barrier/mod.rs
Lines 772 to 780 in 1ba6981
if let Err(err) = result { | |
// FIXME: If it is a connector source error occurred in the init barrier, we should pass | |
// back to frontend | |
fail_point!("inject_barrier_err_success"); | |
let fail_node = checkpoint_control.barrier_failed(); | |
tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err); | |
self.do_recovery(err, fail_node, state, tracker, checkpoint_control) | |
.await; | |
return; |
Candidate solutions:
-
Employ a bounded retry strategy for connector sources
When a connector source encounters an error, we will try our best to recover the consumer client. For example, we can drop the current consumer and create a new one to try to resume consumption. If we fail to recover the consumer, we can choose to hang up the connector source stream and prompt users to drop the source and troubleshoot the upstream system. -
Hang up the connector source stream forever as the current implementation did and prompt users to drop the source and troubleshoot the upstream system. For example, we can prompt an error to users when they query the downstream MVs of the broken source.
Additional context
No response