Skip to content

Commit d7c27eb

Browse files
authored
enhancement(aws_s3 source): optionally configure source to defer or delete older notifications (#22691)
* make configurable to defer or delete old messages in queue * formatting * update docs * name field more inline with other fields (poll_secs) * add changelog * avoid unwrap * rename to match config of deferred instead of retry * nest deferred configurations under deferred. require them to be together * correct changelog * add ticks to emphasize the variable names * remove panic. update send_message to require a target queue_url * internal_log_rate_limit & fmt
1 parent 8861538 commit d7c27eb

File tree

4 files changed

+317
-7
lines changed

4 files changed

+317
-7
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add deferred processing options (`deferred.max_age_secs`, `deferred.queue_url`) to automatically route older event notifications to a separate queue, enabling prioritized processing of recent files
2+
3+
authors: akutta

src/internal_events/aws_sqs.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use vector_lib::internal_event::{error_stage, error_type};
99
mod s3 {
1010
use aws_sdk_sqs::types::{
1111
BatchResultErrorEntry, DeleteMessageBatchRequestEntry, DeleteMessageBatchResultEntry,
12+
SendMessageBatchRequestEntry, SendMessageBatchResultEntry,
1213
};
1314

1415
use super::*;
@@ -114,6 +115,80 @@ mod s3 {
114115
.increment(1);
115116
}
116117
}
118+
119+
#[derive(Debug)]
120+
pub struct SqsMessageSentSucceeded {
121+
pub message_ids: Vec<SendMessageBatchResultEntry>,
122+
}
123+
124+
impl InternalEvent for SqsMessageSentSucceeded {
125+
fn emit(self) {
126+
trace!(message = "Deferred SQS message(s).",
127+
message_ids = %self.message_ids.iter()
128+
.map(|x| x.id.as_str())
129+
.collect::<Vec<_>>()
130+
.join(", "));
131+
counter!("sqs_message_defer_succeeded_total").increment(self.message_ids.len() as u64);
132+
}
133+
}
134+
135+
#[derive(Debug)]
136+
pub struct SqsMessageSentPartialError {
137+
pub entries: Vec<BatchResultErrorEntry>,
138+
}
139+
140+
impl InternalEvent for SqsMessageSentPartialError {
141+
fn emit(self) {
142+
error!(
143+
message = "Sending of deferred SQS message(s) failed.",
144+
message_ids = %self.entries.iter()
145+
.map(|x| format!("{}/{}", x.id, x.code))
146+
.collect::<Vec<_>>()
147+
.join(", "),
148+
error_code = "failed_deferring_some_sqs_messages",
149+
error_type = error_type::ACKNOWLEDGMENT_FAILED,
150+
stage = error_stage::PROCESSING,
151+
internal_log_rate_limit = true,
152+
);
153+
counter!(
154+
"component_errors_total",
155+
"error_code" => "failed_deferring_some_sqs_messages",
156+
"error_type" => error_type::ACKNOWLEDGMENT_FAILED,
157+
"stage" => error_stage::PROCESSING,
158+
)
159+
.increment(1);
160+
}
161+
}
162+
163+
#[derive(Debug)]
164+
pub struct SqsMessageSendBatchError<E> {
165+
pub entries: Vec<SendMessageBatchRequestEntry>,
166+
pub error: E,
167+
}
168+
169+
impl<E: std::fmt::Display> InternalEvent for SqsMessageSendBatchError<E> {
170+
fn emit(self) {
171+
error!(
172+
message = "Sending of deferred SQS message(s) failed.",
173+
message_ids = %self.entries.iter()
174+
.map(|x| x.id.as_str())
175+
.collect::<Vec<_>>()
176+
.join(", "),
177+
error = %self.error,
178+
error_code = "failed_deferring_all_sqs_messages",
179+
error_type = error_type::ACKNOWLEDGMENT_FAILED,
180+
stage = error_stage::PROCESSING,
181+
internal_log_rate_limit = true,
182+
);
183+
counter!(
184+
"component_errors_total",
185+
"error_code" => "failed_deferring_all_sqs_messages",
186+
"error_type" => error_type::ACKNOWLEDGMENT_FAILED,
187+
"stage" => error_stage::PROCESSING,
188+
)
189+
.increment(1);
190+
}
191+
}
117192
}
118193

119194
#[derive(Debug)]

0 commit comments

Comments
 (0)