Skip to content

Commit fa09de3

Browse files
dsmith3197jszwedko
authored andcommitted
fix(kafka sink): Make KafkaService return Poll::Pending when producer queue is full (vectordotdev#18770)
* fix(kafka sink): set concurrency limits equal to kafka producer queue limits * use send_result to better track state * nits * clippy
1 parent 3eaad37 commit fa09de3

File tree

2 files changed

+78
-26
lines changed

2 files changed

+78
-26
lines changed

src/sinks/kafka/service.rs

+77-18
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
use std::task::{Context, Poll};
1+
use std::{
2+
sync::{
3+
atomic::{AtomicUsize, Ordering},
4+
Arc,
5+
},
6+
task::{Context, Poll},
7+
time::Duration,
8+
};
29

310
use bytes::Bytes;
411
use rdkafka::{
512
error::KafkaError,
613
message::OwnedHeaders,
714
producer::{FutureProducer, FutureRecord},
8-
util::Timeout,
15+
types::RDKafkaErrorCode,
916
};
1017

1118
use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};
@@ -59,16 +66,38 @@ impl MetaDescriptive for KafkaRequest {
5966
}
6067
}
6168

69+
/// BlockedRecordState manages state for a record blocked from being enqueued on the producer.
70+
struct BlockedRecordState {
71+
records_blocked: Arc<AtomicUsize>,
72+
}
73+
74+
impl BlockedRecordState {
75+
fn new(records_blocked: Arc<AtomicUsize>) -> Self {
76+
records_blocked.fetch_add(1, Ordering::Relaxed);
77+
Self { records_blocked }
78+
}
79+
}
80+
81+
impl Drop for BlockedRecordState {
82+
fn drop(&mut self) {
83+
self.records_blocked.fetch_sub(1, Ordering::Relaxed);
84+
}
85+
}
86+
6287
#[derive(Clone)]
6388
pub struct KafkaService {
6489
kafka_producer: FutureProducer<KafkaStatisticsContext>,
90+
91+
/// The number of records blocked from being enqueued on the producer.
92+
records_blocked: Arc<AtomicUsize>,
6593
}
6694

6795
impl KafkaService {
68-
pub(crate) const fn new(
69-
kafka_producer: FutureProducer<KafkaStatisticsContext>,
70-
) -> KafkaService {
71-
KafkaService { kafka_producer }
96+
pub(crate) fn new(kafka_producer: FutureProducer<KafkaStatisticsContext>) -> KafkaService {
97+
KafkaService {
98+
kafka_producer,
99+
records_blocked: Arc::new(AtomicUsize::new(0)),
100+
}
72101
}
73102
}
74103

@@ -78,13 +107,21 @@ impl Service<KafkaRequest> for KafkaService {
78107
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
79108

80109
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81-
Poll::Ready(Ok(()))
110+
// The Kafka service is at capacity if any records are currently blocked from being enqueued
111+
// on the producer.
112+
if self.records_blocked.load(Ordering::Relaxed) > 0 {
113+
Poll::Pending
114+
} else {
115+
Poll::Ready(Ok(()))
116+
}
82117
}
83118

84119
fn call(&mut self, request: KafkaRequest) -> Self::Future {
85120
let this = self.clone();
86121

87122
Box::pin(async move {
123+
let raw_byte_size =
124+
request.body.len() + request.metadata.key.as_ref().map_or(0, |x| x.len());
88125
let event_byte_size = request
89126
.request_metadata
90127
.into_events_estimated_json_encoded_byte_size();
@@ -101,17 +138,39 @@ impl Service<KafkaRequest> for KafkaService {
101138
record = record.headers(headers);
102139
}
103140

104-
// rdkafka will internally retry forever if the queue is full
105-
match this.kafka_producer.send(record, Timeout::Never).await {
106-
Ok((_partition, _offset)) => {
107-
let raw_byte_size =
108-
request.body.len() + request.metadata.key.map_or(0, |x| x.len());
109-
Ok(KafkaResponse {
110-
event_byte_size,
111-
raw_byte_size,
112-
})
113-
}
114-
Err((kafka_err, _original_record)) => Err(kafka_err),
141+
// Manually poll [FutureProducer::send_result] instead of [FutureProducer::send] to track
142+
// records that fail to be enqueued on the producer.
143+
let mut blocked_state: Option<BlockedRecordState> = None;
144+
loop {
145+
match this.kafka_producer.send_result(record) {
146+
// Record was successfully enqueued on the producer.
147+
Ok(fut) => {
148+
// Drop the blocked state (if any), as the producer is no longer blocked.
149+
drop(blocked_state.take());
150+
return fut
151+
.await
152+
.expect("producer unexpectedly dropped")
153+
.map(|_| KafkaResponse {
154+
event_byte_size,
155+
raw_byte_size,
156+
})
157+
.map_err(|(err, _)| err);
158+
}
159+
// Producer queue is full.
160+
Err((
161+
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
162+
original_record,
163+
)) => {
164+
if blocked_state.is_none() {
165+
blocked_state =
166+
Some(BlockedRecordState::new(Arc::clone(&this.records_blocked)));
167+
}
168+
record = original_record;
169+
tokio::time::sleep(Duration::from_millis(100)).await;
170+
}
171+
// A different error occurred.
172+
Err((err, _)) => return Err(err),
173+
};
115174
}
116175
})
117176
}

src/sinks/kafka/sink.rs

+1-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use rdkafka::{
66
};
77
use snafu::{ResultExt, Snafu};
88
use tokio::time::Duration;
9-
use tower::limit::ConcurrencyLimit;
109
use vrl::path::OwnedTargetPath;
1110

1211
use super::config::{KafkaRole, KafkaSinkConfig};
@@ -62,11 +61,6 @@ impl KafkaSink {
6261
}
6362

6463
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
65-
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing.
66-
// 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying
67-
// buffer is full.
68-
let service = ConcurrencyLimit::new(self.service.clone(), 64);
69-
7064
let request_builder = KafkaRequestBuilder {
7165
key_field: self.key_field,
7266
headers_key: self.headers_key,
@@ -100,8 +94,7 @@ impl KafkaSink {
10094
Ok(req) => Some(req),
10195
}
10296
})
103-
.into_driver(service)
104-
.protocol("kafka")
97+
.into_driver(self.service)
10598
.protocol("kafka")
10699
.run()
107100
.await

0 commit comments

Comments
 (0)