Skip to content

Commit 7b2bddc

Browse files
wochingeArzelaAscoIineuronull
authored
feat(new sink): add AWS Simple Notification Service aws_sns sink (vectordotdev#18141)
* ci: speed up tests by only running sqs tests * chore: prepare split in sqs and sns * chore: wrap up split for foundation * refactor: separate config * refactor: implement separate publisher * refactor: drop no longer needed message builder * refactor: extract error type * style: name variable correctly * feat: add sns 🎉 * refactor: abstract retry logic * chore: remove sqs from shared modules * chore: cleanup * chore: drop temporary changes * chore: update mod to include sns module * refactor: move healthcheck out of config into separate function * refactor: simplify request builder setup * refactor: message id * nit: make functions just visibile in sub module * fix: add pub(super) * fix: sub methods attributes pub(super) * Update Cargo.toml Co-authored-by: neuronull <[email protected]> * Update src/sinks/aws_s_s/mod.rs Co-authored-by: neuronull <[email protected]> * Update src/sinks/aws_s_s/retry.rs Co-authored-by: neuronull <[email protected]> * Update src/sinks/mod.rs Co-authored-by: neuronull <[email protected]> * Update src/sinks/aws_s_s/sns/config.rs Co-authored-by: neuronull <[email protected]> * chore: first bunch of comments * chore: second bunch of comments * chore: enable all integration tests * fix: move test * fix: dead_code warning * fix: dead code warning * docs: autogenerated aws_sns docs * fix: move region serde to downstream impl * update licenses --------- Co-authored-by: Kristof Herrmann <[email protected]> Co-authored-by: ArzelaAscoIi <[email protected]> Co-authored-by: neuronull <[email protected]>
1 parent 833ac19 commit 7b2bddc

26 files changed

+1464
-341
lines changed

Cargo.lock

+26
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+6-1
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ metrics-tracing-context = { version = "0.14.0", default-features = false }
172172
# depending on a fork to circumvent https://github.com/awslabs/aws-sdk-rust/issues/749
173173
aws-sdk-s3 = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
174174
aws-sdk-sqs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
175+
aws-sdk-sns = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
175176
aws-sdk-cloudwatch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
176177
aws-sdk-cloudwatchlogs = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
177178
aws-sdk-elasticsearch = { git = "https://github.com/vectordotdev/aws-sdk-rust", rev = "3d6aefb7fcfced5fc2a7e761a87e4ddbda1ee670", default-features = false, features = ["native-tls"], optional = true }
@@ -622,6 +623,7 @@ sinks-logs = [
622623
"sinks-aws_kinesis_streams",
623624
"sinks-aws_s3",
624625
"sinks-aws_sqs",
626+
"sinks-aws_sns",
625627
"sinks-axiom",
626628
"sinks-azure_blob",
627629
"sinks-azure_monitor_logs",
@@ -681,6 +683,7 @@ sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"]
681683
sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"]
682684
sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"]
683685
sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
686+
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
684687
sinks-axiom = ["sinks-elasticsearch"]
685688
sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"]
686689
sinks-azure_monitor_logs = []
@@ -789,6 +792,7 @@ aws-integration-tests = [
789792
"aws-kinesis-streams-integration-tests",
790793
"aws-s3-integration-tests",
791794
"aws-sqs-integration-tests",
795+
"aws-sns-integration-tests",
792796
]
793797

794798
azure-integration-tests = [
@@ -802,7 +806,8 @@ aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"]
802806
aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "dep:aws-sdk-elasticsearch", "sinks-elasticsearch"]
803807
aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"]
804808
aws-s3-integration-tests = ["sinks-aws_s3", "sources-aws_s3"]
805-
aws-sqs-integration-tests = ["sinks-aws_sqs", "sources-aws_sqs"]
809+
aws-sqs-integration-tests = ["sinks-aws_sqs"]
810+
aws-sns-integration-tests = ["sinks-aws_sns"]
806811
axiom-integration-tests = ["sinks-axiom"]
807812
azure-blob-integration-tests = ["sinks-azure_blob"]
808813
chronicle-integration-tests = ["sinks-gcp"]

LICENSE-3rdparty.csv

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS R
5454
aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
5555
aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
5656
aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
57+
aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
5758
aws-sdk-sqs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
5859
aws-sdk-sso,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
5960
aws-sdk-sts,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"

scripts/integration/aws/compose.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ services:
66
mock-localstack:
77
image: docker.io/localstack/localstack-full:0.11.6
88
environment:
9-
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs
9+
- SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs,sns
1010
mock-watchlogs:
1111
image: docker.io/luciofranco/mockwatchlogs:latest
1212
mock-ecs:

scripts/integration/aws/test.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ env:
1313
KINESIS_ADDRESS: http://mock-localstack:4566
1414
S3_ADDRESS: http://mock-localstack:4566
1515
SQS_ADDRESS: http://mock-localstack:4566
16+
SNS_ADDRESS: http://mock-localstack:4566
1617
WATCHLOGS_ADDRESS: http://mock-watchlogs:6000
1718

1819
matrix:

src/sinks/aws_s_s/client.rs

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use super::{request_builder::SendMessageEntry, service::SendMessageResponse};
2+
use aws_sdk_sqs::types::SdkError;
3+
4+
#[async_trait::async_trait]
5+
pub(super) trait Client<R>
6+
where
7+
R: std::fmt::Debug + std::fmt::Display + std::error::Error,
8+
{
9+
async fn send_message(
10+
&self,
11+
entry: SendMessageEntry,
12+
byte_size: usize,
13+
) -> Result<SendMessageResponse, SdkError<R>>;
14+
}

src/sinks/aws_s_s/config.rs

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::convert::TryFrom;
2+
3+
use snafu::{ResultExt, Snafu};
4+
5+
use vector_config::configurable_component;
6+
7+
use crate::{
8+
aws::AwsAuthentication,
9+
codecs::EncodingConfig,
10+
config::AcknowledgementsConfig,
11+
sinks::util::TowerRequestConfig,
12+
template::{Template, TemplateParseError},
13+
tls::TlsConfig,
14+
};
15+
16+
#[derive(Debug, Snafu)]
17+
pub(super) enum BuildError {
18+
#[snafu(display("`message_group_id` should be defined for FIFO queue."))]
19+
MessageGroupIdMissing,
20+
#[snafu(display("`message_group_id` is not allowed with non-FIFO queue."))]
21+
MessageGroupIdNotAllowed,
22+
#[snafu(display("invalid topic template: {}", source))]
23+
TopicTemplate { source: TemplateParseError },
24+
#[snafu(display("invalid message_deduplication_id template: {}", source))]
25+
MessageDeduplicationIdTemplate { source: TemplateParseError },
26+
}
27+
28+
/// Base Configuration `aws_s_s` for sns and sqs sink.
29+
#[configurable_component]
30+
#[derive(Clone, Debug)]
31+
#[serde(deny_unknown_fields)]
32+
pub(super) struct BaseSSSinkConfig {
33+
#[configurable(derived)]
34+
pub(super) encoding: EncodingConfig,
35+
36+
/// The tag that specifies that a message belongs to a specific message group.
37+
///
38+
/// Can be applied only to FIFO queues.
39+
#[configurable(metadata(docs::examples = "vector"))]
40+
#[configurable(metadata(docs::examples = "vector-%Y-%m-%d"))]
41+
pub(super) message_group_id: Option<String>,
42+
43+
/// The message deduplication ID value to allow AWS to identify duplicate messages.
44+
///
45+
/// This value is a template which should result in a unique string for each event. See the [AWS
46+
/// documentation][deduplication_id_docs] for more about how AWS does message deduplication.
47+
///
48+
/// [deduplication_id_docs]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
49+
#[configurable(metadata(docs::examples = "{{ transaction_id }}"))]
50+
pub(super) message_deduplication_id: Option<String>,
51+
52+
#[configurable(derived)]
53+
#[serde(default)]
54+
pub(super) request: TowerRequestConfig,
55+
56+
#[configurable(derived)]
57+
pub(super) tls: Option<TlsConfig>,
58+
59+
/// The ARN of an [IAM role][iam_role] to assume at startup.
60+
///
61+
/// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
62+
#[configurable(deprecated)]
63+
#[configurable(metadata(docs::hidden))]
64+
pub(super) assume_role: Option<String>,
65+
66+
#[configurable(derived)]
67+
#[serde(default)]
68+
pub(super) auth: AwsAuthentication,
69+
70+
#[configurable(derived)]
71+
#[serde(
72+
default,
73+
deserialize_with = "crate::serde::bool_or_struct",
74+
skip_serializing_if = "crate::serde::skip_serializing_if_default"
75+
)]
76+
pub(super) acknowledgements: AcknowledgementsConfig,
77+
}
78+
79+
pub(super) fn message_group_id(
80+
message_group_id: Option<String>,
81+
fifo: bool,
82+
) -> crate::Result<Option<Template>> {
83+
match (message_group_id.as_ref(), fifo) {
84+
(Some(value), true) => Ok(Some(
85+
Template::try_from(value.clone()).context(TopicTemplateSnafu)?,
86+
)),
87+
(Some(_), false) => Err(Box::new(BuildError::MessageGroupIdNotAllowed)),
88+
(None, true) => Err(Box::new(BuildError::MessageGroupIdMissing)),
89+
(None, false) => Ok(None),
90+
}
91+
}
92+
pub(super) fn message_deduplication_id(
93+
message_deduplication_id: Option<String>,
94+
) -> crate::Result<Option<Template>> {
95+
Ok(message_deduplication_id
96+
.clone()
97+
.map(Template::try_from)
98+
.transpose()?)
99+
}

src/sinks/aws_s_s/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
mod client;
2+
mod config;
3+
mod request_builder;
4+
mod retry;
5+
mod service;
6+
mod sink;
7+
8+
#[cfg(feature = "sinks-aws_sqs")]
9+
mod sqs;
10+
11+
#[cfg(feature = "sinks-aws_sns")]
12+
mod sns;

src/sinks/aws_sqs/request_builder.rs renamed to src/sinks/aws_s_s/request_builder.rs

+30-30
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use bytes::Bytes;
22
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};
33
use vector_core::ByteSizeOf;
44

5-
use super::config::SqsSinkConfig;
5+
use crate::codecs::EncodingConfig;
66
use crate::{
77
codecs::{Encoder, Transformer},
88
event::{Event, EventFinalizers, Finalizable},
@@ -15,37 +15,39 @@ use crate::{
1515
};
1616

1717
#[derive(Clone)]
18-
pub struct SqsMetadata {
19-
pub finalizers: EventFinalizers,
20-
pub message_group_id: Option<String>,
21-
pub message_deduplication_id: Option<String>,
18+
pub(super) struct SSMetadata {
19+
pub(super) finalizers: EventFinalizers,
20+
pub(super) message_group_id: Option<String>,
21+
pub(super) message_deduplication_id: Option<String>,
2222
}
2323

2424
#[derive(Clone)]
25-
pub(crate) struct SqsRequestBuilder {
25+
pub(super) struct SSRequestBuilder {
2626
encoder: (Transformer, Encoder<()>),
2727
message_group_id: Option<Template>,
2828
message_deduplication_id: Option<Template>,
29-
queue_url: String,
3029
}
3130

32-
impl SqsRequestBuilder {
33-
pub fn new(config: SqsSinkConfig) -> crate::Result<Self> {
34-
let transformer = config.encoding.transformer();
35-
let serializer = config.encoding.build()?;
31+
impl SSRequestBuilder {
32+
pub(super) fn new(
33+
message_group_id: Option<Template>,
34+
message_deduplication_id: Option<Template>,
35+
encoding_config: EncodingConfig,
36+
) -> crate::Result<Self> {
37+
let transformer = encoding_config.transformer();
38+
let serializer = encoding_config.build()?;
3639
let encoder = Encoder::<()>::new(serializer);
3740

3841
Ok(Self {
3942
encoder: (transformer, encoder),
40-
message_group_id: config.message_group_id()?,
41-
message_deduplication_id: config.message_deduplication_id()?,
42-
queue_url: config.queue_url,
43+
message_group_id,
44+
message_deduplication_id,
4345
})
4446
}
4547
}
4648

47-
impl RequestBuilder<Event> for SqsRequestBuilder {
48-
type Metadata = SqsMetadata;
49+
impl RequestBuilder<Event> for SSRequestBuilder {
50+
type Metadata = SSMetadata;
4951
type Events = Event;
5052
type Encoder = (Transformer, Encoder<()>);
5153
type Payload = Bytes;
@@ -95,17 +97,17 @@ impl RequestBuilder<Event> for SqsRequestBuilder {
9597

9698
let builder = RequestMetadataBuilder::from_event(&event);
9799

98-
let sqs_metadata = SqsMetadata {
100+
let metadata = SSMetadata {
99101
finalizers: event.take_finalizers(),
100102
message_group_id,
101103
message_deduplication_id,
102104
};
103-
(sqs_metadata, builder, event)
105+
(metadata, builder, event)
104106
}
105107

106108
fn build_request(
107109
&self,
108-
sqs_metadata: Self::Metadata,
110+
client_metadata: Self::Metadata,
109111
metadata: RequestMetadata,
110112
payload: EncodeResult<Self::Payload>,
111113
) -> Self::Request {
@@ -114,23 +116,21 @@ impl RequestBuilder<Event> for SqsRequestBuilder {
114116

115117
SendMessageEntry {
116118
message_body,
117-
message_group_id: sqs_metadata.message_group_id,
118-
message_deduplication_id: sqs_metadata.message_deduplication_id,
119-
queue_url: self.queue_url.clone(),
120-
finalizers: sqs_metadata.finalizers,
119+
message_group_id: client_metadata.message_group_id,
120+
message_deduplication_id: client_metadata.message_deduplication_id,
121+
finalizers: client_metadata.finalizers,
121122
metadata,
122123
}
123124
}
124125
}
125126

126127
#[derive(Debug, Clone)]
127-
pub(crate) struct SendMessageEntry {
128-
pub message_body: String,
129-
pub message_group_id: Option<String>,
130-
pub message_deduplication_id: Option<String>,
131-
pub queue_url: String,
132-
finalizers: EventFinalizers,
133-
pub metadata: RequestMetadata,
128+
pub(super) struct SendMessageEntry {
129+
pub(super) message_body: String,
130+
pub(super) message_group_id: Option<String>,
131+
pub(super) message_deduplication_id: Option<String>,
132+
pub(super) finalizers: EventFinalizers,
133+
pub(super) metadata: RequestMetadata,
134134
}
135135

136136
impl ByteSizeOf for SendMessageEntry {

src/sinks/aws_s_s/retry.rs

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use aws_sdk_sqs::types::SdkError;
2+
use std::marker::PhantomData;
3+
4+
use super::service::SendMessageResponse;
5+
use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic};
6+
7+
#[derive(Debug)]
8+
pub(super) struct SSRetryLogic<E> {
9+
_phantom: PhantomData<fn() -> E>,
10+
}
11+
12+
impl<E> SSRetryLogic<E>
13+
where
14+
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
15+
{
16+
pub(super) fn new() -> SSRetryLogic<E> {
17+
Self {
18+
_phantom: PhantomData,
19+
}
20+
}
21+
}
22+
23+
impl<E> RetryLogic for SSRetryLogic<E>
24+
where
25+
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
26+
{
27+
type Error = SdkError<E>;
28+
type Response = SendMessageResponse;
29+
30+
fn is_retriable_error(&self, error: &Self::Error) -> bool {
31+
is_retriable_error(error)
32+
}
33+
}
34+
35+
impl<E> Clone for SSRetryLogic<E>
36+
where
37+
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
38+
{
39+
fn clone(&self) -> SSRetryLogic<E> {
40+
SSRetryLogic {
41+
_phantom: PhantomData,
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)