Skip to content

Commit bebac21

Browse files
dengmingtongJason Goodwinjasongoodwinspencergilbert
authored
feat(kinesis sinks): implement full retry of partial failures in firehose/streams (vectordotdev#17535)
This PR is from vectordotdev#16771 PR. Refactor some action checking. closes: vectordotdev#17424 --------- Signed-off-by: Spencer Gilbert <[email protected]> Co-authored-by: Jason Goodwin <[email protected]> Co-authored-by: Jason Goodwin <[email protected]> Co-authored-by: Spencer Gilbert <[email protected]>
1 parent 2ad964d commit bebac21

File tree

13 files changed

+103
-23
lines changed

13 files changed

+103
-23
lines changed

src/sinks/aws_kinesis/config.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ pub struct KinesisSinkBaseConfig {
5252
#[serde(default)]
5353
pub auth: AwsAuthentication,
5454

55+
/// Whether or not to retry successful requests containing partial failures.
56+
#[serde(default)]
57+
#[configurable(metadata(docs::advanced))]
58+
pub request_retry_partial: bool,
59+
5560
#[configurable(derived)]
5661
#[serde(
5762
default,
@@ -77,6 +82,7 @@ pub fn build_sink<C, R, RR, E, RT>(
7782
partition_key_field: Option<String>,
7883
batch_settings: BatcherSettings,
7984
client: C,
85+
retry_logic: RT,
8086
) -> crate::Result<VectorSink>
8187
where
8288
C: SendRecord + Clone + Send + Sync + 'static,
@@ -92,7 +98,7 @@ where
9298

9399
let region = config.region.region();
94100
let service = ServiceBuilder::new()
95-
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, RT::default())
101+
.settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
96102
.service(KinesisService::<C, R, E> {
97103
client,
98104
stream_name: config.stream_name.clone(),

src/sinks/aws_kinesis/firehose/config.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use futures::FutureExt;
88
use snafu::Snafu;
99
use vector_config::configurable_component;
1010

11+
use crate::sinks::util::retries::RetryAction;
1112
use crate::{
1213
aws::{create_client, is_retriable_error, ClientBuilder},
1314
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
@@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig {
141142
None,
142143
batch_settings,
143144
KinesisFirehoseClient { client },
145+
KinesisRetryLogic {
146+
retry_partial: self.base.request_retry_partial,
147+
},
144148
)?;
145149

146150
Ok((sink, healthcheck))
@@ -166,7 +170,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig {
166170
}
167171

168172
#[derive(Clone, Default)]
169-
struct KinesisRetryLogic;
173+
struct KinesisRetryLogic {
174+
retry_partial: bool,
175+
}
170176

171177
impl RetryLogic for KinesisRetryLogic {
172178
type Error = SdkError<KinesisError>;
@@ -180,4 +186,13 @@ impl RetryLogic for KinesisRetryLogic {
180186
}
181187
is_retriable_error(error)
182188
}
189+
190+
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
191+
if response.failure_count > 0 && self.retry_partial {
192+
let msg = format!("partial error count {}", response.failure_count);
193+
RetryAction::Retry(msg.into())
194+
} else {
195+
RetryAction::Successful
196+
}
197+
}
183198
}

src/sinks/aws_kinesis/firehose/integration_tests.rs

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ async fn firehose_put_records() {
5757
tls: None,
5858
auth: Default::default(),
5959
acknowledgements: Default::default(),
60+
request_retry_partial: Default::default(),
6061
};
6162

6263
let config = KinesisFirehoseSinkConfig { batch, base };

src/sinks/aws_kinesis/firehose/record.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use aws_sdk_firehose::output::PutRecordBatchOutput;
12
use aws_sdk_firehose::types::{Blob, SdkError};
23
use bytes::Bytes;
34
use tracing::Instrument;
45

5-
use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
6+
use crate::sinks::prelude::*;
7+
8+
use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};
69

710
#[derive(Clone)]
811
pub struct KinesisFirehoseRecord {
@@ -46,14 +49,26 @@ impl SendRecord for KinesisFirehoseClient {
4649
type T = KinesisRecord;
4750
type E = KinesisError;
4851

49-
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
52+
async fn send(
53+
&self,
54+
records: Vec<Self::T>,
55+
stream_name: String,
56+
) -> Result<KinesisResponse, SdkError<Self::E>> {
57+
let rec_count = records.len();
58+
let total_size = records.iter().fold(0, |acc, record| {
59+
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
60+
});
5061
self.client
5162
.put_record_batch()
5263
.set_records(Some(records))
5364
.delivery_stream_name(stream_name)
5465
.send()
5566
.instrument(info_span!("request").or_current())
5667
.await
57-
.err()
68+
.map(|output: PutRecordBatchOutput| KinesisResponse {
69+
count: rec_count,
70+
failure_count: output.failed_put_count().unwrap_or(0) as usize,
71+
events_byte_size: JsonSize::new(total_size),
72+
})
5873
}
5974
}

src/sinks/aws_kinesis/firehose/tests.rs

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ async fn check_batch_size() {
3333
request: Default::default(),
3434
tls: None,
3535
auth: Default::default(),
36+
request_retry_partial: false,
3637
acknowledgements: Default::default(),
3738
};
3839

@@ -62,6 +63,7 @@ async fn check_batch_events() {
6263
request: Default::default(),
6364
tls: None,
6465
auth: Default::default(),
66+
request_retry_partial: false,
6567
acknowledgements: Default::default(),
6668
};
6769

src/sinks/aws_kinesis/record.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use async_trait::async_trait;
22
use aws_smithy_client::SdkError;
33
use bytes::Bytes;
44

5+
use super::KinesisResponse;
56
/// An AWS Kinesis record type primarily to store the underlying aws crates' actual record `T`, and
67
/// to abstract the encoded length calculation.
78
pub trait Record {
@@ -24,5 +25,9 @@ pub trait SendRecord {
2425
type E;
2526

2627
/// Sends the records.
27-
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>>;
28+
async fn send(
29+
&self,
30+
records: Vec<Self::T>,
31+
stream_name: String,
32+
) -> Result<KinesisResponse, SdkError<Self::E>>;
2833
}

src/sinks/aws_kinesis/service.rs

+7-13
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ where
3737
}
3838

3939
pub struct KinesisResponse {
40-
count: usize,
41-
events_byte_size: JsonSize,
40+
pub(crate) count: usize,
41+
pub(crate) failure_count: usize,
42+
pub(crate) events_byte_size: JsonSize,
4243
}
4344

4445
impl DriverResponse for KinesisResponse {
@@ -72,7 +73,6 @@ where
7273
let events_byte_size = requests
7374
.get_metadata()
7475
.events_estimated_json_encoded_byte_size();
75-
let count = requests.get_metadata().event_count();
7676

7777
let records = requests
7878
.events
@@ -84,16 +84,10 @@ where
8484
let stream_name = self.stream_name.clone();
8585

8686
Box::pin(async move {
87-
// Returning a Result (a trait that implements Try) is not a stable feature,
88-
// so instead we have to explicitly check for error and return.
89-
// https://github.com/rust-lang/rust/issues/84277
90-
if let Some(e) = client.send(records, stream_name).await {
91-
return Err(e);
92-
}
93-
94-
Ok(KinesisResponse {
95-
count,
96-
events_byte_size,
87+
client.send(records, stream_name).await.map(|mut r| {
88+
// augment the response
89+
r.events_byte_size = events_byte_size;
90+
r
9791
})
9892
})
9993
}

src/sinks/aws_kinesis/streams/config.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use futures::FutureExt;
66
use snafu::Snafu;
77
use vector_config::{component::GenerateConfig, configurable_component};
88

9+
use crate::sinks::util::retries::RetryAction;
910
use crate::{
1011
aws::{create_client, is_retriable_error, ClientBuilder},
1112
config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
@@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig {
148149
self.partition_key_field.clone(),
149150
batch_settings,
150151
KinesisStreamClient { client },
152+
KinesisRetryLogic {
153+
retry_partial: self.base.request_retry_partial,
154+
},
151155
)?;
152156

153157
Ok((sink, healthcheck))
@@ -173,7 +177,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig {
173177
}
174178
}
175179
#[derive(Default, Clone)]
176-
struct KinesisRetryLogic;
180+
struct KinesisRetryLogic {
181+
retry_partial: bool,
182+
}
177183

178184
impl RetryLogic for KinesisRetryLogic {
179185
type Error = SdkError<KinesisError>;
@@ -193,6 +199,15 @@ impl RetryLogic for KinesisRetryLogic {
193199
}
194200
is_retriable_error(error)
195201
}
202+
203+
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
204+
if response.failure_count > 0 && self.retry_partial {
205+
let msg = format!("partial error count {}", response.failure_count);
206+
RetryAction::Retry(msg.into())
207+
} else {
208+
RetryAction::Successful
209+
}
210+
}
196211
}
197212

198213
#[cfg(test)]

src/sinks/aws_kinesis/streams/integration_tests.rs

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ async fn kinesis_put_records_without_partition_key() {
9898
tls: Default::default(),
9999
auth: Default::default(),
100100
acknowledgements: Default::default(),
101+
request_retry_partial: Default::default(),
101102
};
102103

103104
let config = KinesisStreamsSinkConfig {

src/sinks/aws_kinesis/streams/record.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use aws_sdk_kinesis::output::PutRecordsOutput;
12
use aws_sdk_kinesis::types::{Blob, SdkError};
23
use bytes::Bytes;
34
use tracing::Instrument;
45

5-
use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
6+
use crate::sinks::prelude::*;
7+
8+
use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};
69

710
#[derive(Clone)]
811
pub struct KinesisStreamRecord {
@@ -62,14 +65,26 @@ impl SendRecord for KinesisStreamClient {
6265
type T = KinesisRecord;
6366
type E = KinesisError;
6467

65-
async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
68+
async fn send(
69+
&self,
70+
records: Vec<Self::T>,
71+
stream_name: String,
72+
) -> Result<KinesisResponse, SdkError<Self::E>> {
73+
let rec_count = records.len();
74+
let total_size = records.iter().fold(0, |acc, record| {
75+
acc + record.data().map(|v| v.as_ref().len()).unwrap_or_default()
76+
});
6677
self.client
6778
.put_records()
6879
.set_records(Some(records))
6980
.stream_name(stream_name)
7081
.send()
7182
.instrument(info_span!("request").or_current())
7283
.await
73-
.err()
84+
.map(|output: PutRecordsOutput| KinesisResponse {
85+
count: rec_count,
86+
failure_count: output.failed_record_count().unwrap_or(0) as usize,
87+
events_byte_size: JsonSize::new(total_size),
88+
})
7489
}
7590
}

website/cue/reference/components/sinks/aws_kinesis_firehose.cue

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ components: sinks: aws_kinesis_firehose: components._aws & {
7575

7676
configuration: base.components.sinks.aws_kinesis_firehose.configuration & {
7777
_aws_include: false
78+
request_retry_partial: warnings: ["This can cause duplicate logs to be published."]
7879
}
7980

8081
input: {

website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue

+5
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
480480
}
481481
}
482482
}
483+
request_retry_partial: {
484+
description: "Whether or not to retry successful requests containing partial failures."
485+
required: false
486+
type: bool: default: false
487+
}
483488
stream_name: {
484489
description: """
485490
The [stream name][stream_name] of the target Kinesis Firehose delivery stream.

website/cue/reference/components/sinks/base/aws_kinesis_streams.cue

+5
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,11 @@ base: components: sinks: aws_kinesis_streams: configuration: {
489489
}
490490
}
491491
}
492+
request_retry_partial: {
493+
description: "Whether or not to retry successful requests containing partial failures."
494+
required: false
495+
type: bool: default: false
496+
}
492497
stream_name: {
493498
description: """
494499
The [stream name][stream_name] of the target Kinesis Firehose delivery stream.

0 commit comments

Comments
 (0)