Skip to content

Bug: consumption of connector source will hang forever if encounters an error #7192

@StrikeW

Description

@StrikeW

Describe the bug

If the connector source throws an error up to the source executor, the consumption of the source will stop forever:

error!("hang up stream reader due to polling error: {}", err);
futures::future::pending().stack_trace("source_error").await

2023-01-04T11:56:46.373988Z INFO risingwave_connector::source::kafka::source::reader:135: kafka message error: Err(KafkaError (Partition EOF: 11))
2023-01-04T11:56:46.375555Z INFO risingwave_connector::source::kafka::source::reader:135: kafka message error: Err(KafkaError (Partition EOF: 11))
2023-01-04T11:56:46.376565Z ERROR risingwave_stream::executor::source::reader:61: hang up stream reader due to polling error: internal error: Partition EOF: 11
2023-01-04T11:56:46.377123Z ERROR risingwave_stream::executor::source::reader:61: hang up stream reader due to polling error: internal error: Partition EOF: 11

But the barrier messages can still pass to downstream executors:

select_with_strategy(
barrier_receiver_arm,
source_stream_arm,
// We prefer barrier on the left hand side over source chunks.
|_: &mut ()| PollNext::Left,
)

I am not sure whether this problem is a by-design feature.

To Reproduce

Manually construct an error in the KafkaSplitReader and propagate it to the upper layer.

diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs
index ab993dfde..e24706f76 100644
--- a/src/connector/src/source/kafka/source/reader.rs
+++ b/src/connector/src/source/kafka/source/reader.rs
@@ -21,6 +21,7 @@ use futures::StreamExt;
 use futures_async_stream::try_stream;
 use rdkafka::config::RDKafkaLogLevel;
 use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
+use rdkafka::error::KafkaError;
 use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};

 use crate::source::base::{SourceMessage, SplitReader, MAX_CHUNK_SIZE};
@@ -129,6 +130,8 @@ impl KafkaSplitReader {
         #[for_await]
         'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) {
             for msg in msgs {
+                let fake_msg = Err(KafkaError::PartitionEOF(11));
+                let my_msg = fake_msg?;
                 let msg = msg?;
                 let cur_offset = msg.offset();
                 bytes_current_second += match &msg.payload() {

Then start the cluster with the full config and run the tpch-bench script to ingest data.

Expected behavior

IIUC, when executors encounter an error in other scenarios, the error will report to the Meta via the barrier collection mechanism (#6319). Then Meta will enter the recovery process to recover the whole streaming graph.

But for the scenario of connector source failure, the upstream system may become available after a while or the failure is unrecoverable indeed. So it may be a waste to let Meta recover the cluster blindly:

if let Err(err) = result {
// FIXME: If it is a connector source error occurred in the init barrier, we should pass
// back to frontend
fail_point!("inject_barrier_err_success");
let fail_node = checkpoint_control.barrier_failed();
tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err);
self.do_recovery(err, fail_node, state, tracker, checkpoint_control)
.await;
return;

Candidate solutions:

  1. Employ a bounded retry strategy for connector sources
    When a connector source encounters an error, we will try our best to recover the consumer client. For example, we can drop the current consumer and create a new one to try to resume consumption. If we fail to recover the consumer, we can choose to hang up the connector source stream and prompt users to drop the source and troubleshoot the upstream system.

  2. Hang up the connector source stream forever as the current implementation did and prompt users to drop the source and troubleshoot the upstream system. For example, we can prompt an error to users when they query the downstream MVs of the broken source.

Additional context

No response

Metadata

Metadata

Assignees

Labels

P-highPriority: High.type/bugType: Bug. Only for issues.

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions