Skip to content

Commit 6196a2b

Browse files
committed
add dead letter fallback
1 parent c2abea9 commit 6196a2b

File tree

6 files changed

+45
-20
lines changed

6 files changed

+45
-20
lines changed

src/sinks/aws_s3/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ impl S3SinkConfig {
224224
.map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
225225
.transpose()?;
226226

227-
let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id);
227+
let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
228228

229229
let transformer = self.encoding.transformer();
230230
let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;

src/sinks/azure_blob/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,6 @@ impl AzureBlobSinkConfig {
264264
}
265265

266266
pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
267-
Ok(KeyPartitioner::new(self.blob_prefix.clone()))
267+
Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
268268
}
269269
}

src/sinks/gcp/cloud_storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ impl GcsSinkConfig {
284284
Ok(KeyPartitioner::new(
285285
Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
286286
.context(KeyPrefixTemplateSnafu)?,
287+
None,
287288
))
288289
}
289290
}

src/sinks/s3_common/partitioner.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@ pub struct S3PartitionKey {
99
}
1010

1111
/// Partitions items based on the generated key for the given event.
12-
pub struct S3KeyPartitioner(Template, Option<Template>);
12+
pub struct S3KeyPartitioner(Template, Option<Template>, Option<String>);
1313

1414
impl S3KeyPartitioner {
1515
pub const fn new(
1616
key_prefix_template: Template,
1717
ssekms_key_id_template: Option<Template>,
18+
dead_letter_key_prefix: Option<String>,
1819
) -> Self {
19-
Self(key_prefix_template, ssekms_key_id_template)
20+
Self(
21+
key_prefix_template,
22+
ssekms_key_id_template,
23+
dead_letter_key_prefix,
24+
)
2025
}
2126
}
2227

@@ -28,14 +33,24 @@ impl Partitioner for S3KeyPartitioner {
2833
let key_prefix = self
2934
.0
3035
.render_string(item)
31-
.map_err(|error| {
32-
emit!(TemplateRenderingError {
33-
error,
34-
field: Some("key_prefix"),
35-
drop_event: true,
36-
});
36+
.or_else(|error| {
37+
if let Some(dead_letter_key_prefix) = &self.2 {
38+
emit!(TemplateRenderingError {
39+
error,
40+
field: Some("key_prefix"),
41+
drop_event: false,
42+
});
43+
Ok(dead_letter_key_prefix.clone())
44+
} else {
45+
Err(emit!(TemplateRenderingError {
46+
error,
47+
field: Some("key_prefix"),
48+
drop_event: true,
49+
}))
50+
}
3751
})
3852
.ok()?;
53+
3954
let ssekms_key_id = self
4055
.1
4156
.as_ref()

src/sinks/util/partitioner.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use vector_lib::{event::Event, partition::Partitioner};
33
use crate::{internal_events::TemplateRenderingError, template::Template};
44

55
/// Partitions items based on the generated key for the given event.
6-
pub struct KeyPartitioner(Template);
6+
pub struct KeyPartitioner(Template, Option<String>);
77

88
impl KeyPartitioner {
9-
pub const fn new(template: Template) -> Self {
10-
Self(template)
9+
pub const fn new(template: Template, dead_letter_key_prefix: Option<String>) -> Self {
10+
Self(template, dead_letter_key_prefix)
1111
}
1212
}
1313

@@ -18,12 +18,21 @@ impl Partitioner for KeyPartitioner {
1818
fn partition(&self, item: &Self::Item) -> Self::Key {
1919
self.0
2020
.render_string(item)
21-
.map_err(|error| {
22-
emit!(TemplateRenderingError {
23-
error,
24-
field: Some("key_prefix"),
25-
drop_event: true,
26-
});
21+
.or_else(|error| {
22+
if let Some(dead_letter_key_prefix) = &self.1 {
23+
emit!(TemplateRenderingError {
24+
error,
25+
field: Some("key_prefix"),
26+
drop_event: false,
27+
});
28+
Ok(dead_letter_key_prefix.clone())
29+
} else {
30+
Err(emit!(TemplateRenderingError {
31+
error,
32+
field: Some("key_prefix"),
33+
drop_event: true,
34+
}))
35+
}
2736
})
2837
.ok()
2938
}

src/sinks/webhdfs/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,6 @@ impl WebHdfsConfig {
160160

161161
pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
162162
let prefix = self.prefix.clone().try_into()?;
163-
Ok(KeyPartitioner::new(prefix))
163+
Ok(KeyPartitioner::new(prefix, None))
164164
}
165165
}

0 commit comments

Comments
 (0)