Skip to content

Commit 986316d

Browse files
committed
feat: separate aws support in es & prometheus sink
1 parent d3a6235 commit 986316d

File tree

8 files changed

+300
-209
lines changed

8 files changed

+300
-209
lines changed

Cargo.lock

+232-194
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ sinks-datadog_events = []
699699
sinks-datadog_logs = []
700700
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
701701
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
702-
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
702+
sinks-elasticsearch = ["transforms-metric_to_log"]
703703
sinks-file = ["dep:async-compression"]
704704
sinks-gcp = ["dep:base64", "gcp"]
705705
sinks-greptimedb = ["dep:greptimedb-client"]
@@ -714,7 +714,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
714714
sinks-new_relic_logs = ["sinks-http"]
715715
sinks-new_relic = []
716716
sinks-papertrail = ["dep:syslog"]
717-
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
717+
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
718718
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
719719
sinks-redis = ["dep:redis"]
720720
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
@@ -821,7 +821,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"]
821821
datadog-metrics-integration-tests = ["sinks-datadog_metrics"]
822822
datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"]
823823
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
824-
es-integration-tests = ["sinks-elasticsearch"]
824+
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
825825
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
826826
fluent-integration-tests = ["docker", "sources-fluent"]
827827
gcp-cloud-storage-integration-tests = ["sinks-gcp"]

src/sinks/elasticsearch/common.rs

+34-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::HashMap;
22

3+
#[cfg(feature = "aws-core")]
34
use aws_credential_types::provider::SharedCredentialsProvider;
5+
#[cfg(feature = "aws-core")]
46
use aws_types::region::Region;
57
use bytes::{Buf, Bytes};
68
use http::{Response, StatusCode, Uri};
@@ -32,10 +34,12 @@ pub struct ElasticsearchCommon {
3234
pub base_url: String,
3335
pub bulk_uri: Uri,
3436
pub http_auth: Option<Auth>,
37+
#[cfg(feature = "aws-core")]
3538
pub aws_auth: Option<SharedCredentialsProvider>,
3639
pub mode: ElasticsearchCommonMode,
3740
pub request_builder: ElasticsearchRequestBuilder,
3841
pub tls_settings: TlsSettings,
42+
#[cfg(feature = "aws-core")]
3943
pub region: Option<Region>,
4044
pub request: RequestConfig,
4145
pub query_params: HashMap<String, String>,
@@ -72,6 +76,7 @@ impl ElasticsearchCommon {
7276
let http_auth = authorization.choose_one(&uri.auth)?;
7377
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
7478

79+
#[cfg(feature = "aws-core")]
7580
let aws_auth = match &config.auth {
7681
Some(ElasticsearchAuth::Basic { .. }) | None => None,
7782
Some(ElasticsearchAuth::Aws(aws)) => {
@@ -126,6 +131,7 @@ impl ElasticsearchCommon {
126131
metric_config.metric_tag_values,
127132
);
128133

134+
#[cfg(feature = "aws-core")]
129135
let region = config.aws.as_ref().and_then(|config| config.region());
130136

131137
let version = if let Some(version) = *version {
@@ -139,7 +145,9 @@ impl ElasticsearchCommon {
139145
match get_version(
140146
&base_url,
141147
&http_auth,
148+
#[cfg(feature = "aws-core")]
142149
&aws_auth,
150+
#[cfg(feature = "aws-core")]
143151
&region,
144152
&request,
145153
&tls_settings,
@@ -198,11 +206,13 @@ impl ElasticsearchCommon {
198206
http_auth,
199207
base_url,
200208
bulk_uri,
209+
#[cfg(feature = "aws-core")]
201210
aws_auth,
202211
mode,
203212
request_builder,
204213
query_params,
205214
request,
215+
#[cfg(feature = "aws-core")]
206216
region,
207217
tls_settings,
208218
metric_to_log,
@@ -249,7 +259,9 @@ impl ElasticsearchCommon {
249259
match get(
250260
&self.base_url,
251261
&self.http_auth,
262+
#[cfg(feature = "aws-core")]
252263
&self.aws_auth,
264+
#[cfg(feature = "aws-core")]
253265
&self.region,
254266
&self.request,
255267
client,
@@ -264,6 +276,7 @@ impl ElasticsearchCommon {
264276
}
265277
}
266278

279+
#[cfg(feature = "aws-core")]
267280
pub async fn sign_request(
268281
request: &mut http::Request<Bytes>,
269282
credentials_provider: &SharedCredentialsProvider,
@@ -275,8 +288,8 @@ pub async fn sign_request(
275288
async fn get_version(
276289
base_url: &str,
277290
http_auth: &Option<Auth>,
278-
aws_auth: &Option<SharedCredentialsProvider>,
279-
region: &Option<Region>,
291+
#[cfg(feature = "aws-core")] aws_auth: &Option<SharedCredentialsProvider>,
292+
#[cfg(feature = "aws-core")] region: &Option<Region>,
280293
request: &RequestConfig,
281294
tls_settings: &TlsSettings,
282295
proxy_config: &ProxyConfig,
@@ -291,9 +304,19 @@ async fn get_version(
291304
}
292305

293306
let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
294-
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
295-
.await
296-
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
307+
let response = get(
308+
base_url,
309+
http_auth,
310+
#[cfg(feature = "aws-core")]
311+
aws_auth,
312+
#[cfg(feature = "aws-core")]
313+
region,
314+
request,
315+
client,
316+
"/",
317+
)
318+
.await
319+
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
297320

298321
let (_, body) = response.into_parts();
299322
let mut body = body::aggregate(body).await?;
@@ -315,8 +338,8 @@ async fn get_version(
315338
async fn get(
316339
base_url: &str,
317340
http_auth: &Option<Auth>,
318-
aws_auth: &Option<SharedCredentialsProvider>,
319-
region: &Option<Region>,
341+
#[cfg(feature = "aws-core")] aws_auth: &Option<SharedCredentialsProvider>,
342+
#[cfg(feature = "aws-core")] region: &Option<Region>,
320343
request: &RequestConfig,
321344
client: HttpClient,
322345
path: &str,
@@ -331,8 +354,11 @@ async fn get(
331354
builder = builder.header(&header[..], &value[..]);
332355
}
333356

334-
let mut request = builder.body(Bytes::new())?;
357+
let request = builder.body(Bytes::new())?;
358+
#[cfg(feature = "aws-core")]
359+
let mut request = request;
335360

361+
#[cfg(feature = "aws-core")]
336362
if let Some(credentials_provider) = aws_auth {
337363
sign_request(&mut request, credentials_provider, region).await?;
338364
}

src/sinks/elasticsearch/config.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use std::{
66
use futures::{FutureExt, TryFutureExt};
77
use vector_config::configurable_component;
88

9+
#[cfg(feature = "aws-core")]
10+
use crate::aws::RegionOrEndpoint;
911
use crate::{
10-
aws::RegionOrEndpoint,
1112
codecs::Transformer,
1213
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
1314
event::{EventRef, LogEvent, Value},
@@ -153,6 +154,7 @@ pub struct ElasticsearchConfig {
153154

154155
#[serde(default)]
155156
#[configurable(derived)]
157+
#[cfg(feature = "aws-core")]
156158
pub aws: Option<RegionOrEndpoint>,
157159

158160
#[serde(default)]
@@ -215,6 +217,7 @@ impl Default for ElasticsearchConfig {
215217
request: Default::default(),
216218
auth: None,
217219
query: None,
220+
#[cfg(feature = "aws-core")]
218221
aws: None,
219222
tls: None,
220223
endpoint_health: None,

src/sinks/elasticsearch/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use snafu::Snafu;
2424
use vector_common::sensitive_string::SensitiveString;
2525
use vector_config::configurable_component;
2626

27+
#[cfg(feature = "aws-core")]
2728
use crate::aws::AwsAuthentication;
2829
use crate::{
2930
event::{EventRef, LogEvent},
@@ -50,6 +51,7 @@ pub enum ElasticsearchAuth {
5051
password: SensitiveString,
5152
},
5253

54+
#[cfg(feature = "aws-core")]
5355
/// Amazon OpenSearch Service-specific authentication.
5456
Aws(AwsAuthentication),
5557
}
@@ -210,6 +212,7 @@ pub enum ParseError {
210212
IndexTemplate { source: TemplateParseError },
211213
#[snafu(display("Batch action template parse error: {}", source))]
212214
BatchActionTemplate { source: TemplateParseError },
215+
#[cfg(feature = "aws-core")]
213216
#[snafu(display("aws.region required when AWS authentication is in use"))]
214217
RegionRequired,
215218
#[snafu(display("Endpoints option must be specified"))]

src/sinks/elasticsearch/service.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::{
44
task::{Context, Poll},
55
};
66

7+
#[cfg(feature = "aws-core")]
78
use aws_credential_types::provider::SharedCredentialsProvider;
9+
#[cfg(feature = "aws-core")]
810
use aws_types::region::Region;
911
use bytes::Bytes;
1012
use futures::future::BoxFuture;
@@ -17,6 +19,7 @@ use vector_common::{
1719
};
1820
use vector_core::{stream::DriverResponse, ByteSizeOf};
1921

22+
#[cfg(feature = "aws-core")]
2023
use crate::sinks::elasticsearch::sign_request;
2124
use crate::{
2225
event::{EventFinalizers, EventStatus, Finalizable},
@@ -96,10 +99,12 @@ impl ElasticsearchService {
9699
pub struct HttpRequestBuilder {
97100
pub bulk_uri: Uri,
98101
pub query_params: HashMap<String, String>,
102+
#[cfg(feature = "aws-core")]
99103
pub region: Option<Region>,
100104
pub compression: Compression,
101105
pub http_request_config: RequestConfig,
102106
pub http_auth: Option<Auth>,
107+
#[cfg(feature = "aws-core")]
103108
pub credentials_provider: Option<SharedCredentialsProvider>,
104109
}
105110

@@ -110,8 +115,10 @@ impl HttpRequestBuilder {
110115
http_request_config: config.request.clone(),
111116
http_auth: common.http_auth.clone(),
112117
query_params: common.query_params.clone(),
118+
#[cfg(feature = "aws-core")]
113119
region: common.region.clone(),
114120
compression: config.compression,
121+
#[cfg(feature = "aws-core")]
115122
credentials_provider: common.aws_auth.clone(),
116123
}
117124
}
@@ -140,10 +147,14 @@ impl HttpRequestBuilder {
140147
builder = auth.apply_builder(builder);
141148
}
142149

143-
let mut request = builder
150+
let request = builder
144151
.body(es_req.payload)
145152
.expect("Invalid http request value used");
146153

154+
#[cfg(feature = "aws-core")]
155+
let mut request = request;
156+
157+
#[cfg(feature = "aws-core")]
147158
if let Some(credentials_provider) = &self.credentials_provider {
148159
sign_request(&mut request, credentials_provider, &self.region).await?;
149160
}

src/sinks/prometheus/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub(crate) mod remote_write;
88

99
use vector_config::configurable_component;
1010

11+
#[cfg(feature = "aws-core")]
1112
use crate::aws::AwsAuthentication;
1213

1314
/// Authentication strategies.
@@ -33,6 +34,7 @@ pub enum PrometheusRemoteWriteAuth {
3334
token: SensitiveString,
3435
},
3536

37+
#[cfg(feature = "aws-core")]
3638
/// Amazon Prometheus Service-specific authentication.
3739
Aws(AwsAuthentication),
3840
}

src/sinks/prometheus/remote_write.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ use vector_config::configurable_component;
1414
use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
1515

1616
use super::collector::{self, MetricCollector as _};
17+
#[cfg(feature = "aws-core")]
18+
use crate::aws::RegionOrEndpoint;
1719
use crate::{
18-
aws::RegionOrEndpoint,
1920
config::{self, AcknowledgementsConfig, Input, SinkConfig},
2021
event::{Event, Metric},
2122
http::{Auth, HttpClient},
@@ -48,6 +49,7 @@ impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings {
4849
enum Errors {
4950
#[snafu(display(r#"Prometheus remote_write sink cannot accept "set" metrics"#))]
5051
SetMetricInvalid,
52+
#[cfg(feature = "aws-core")]
5153
#[snafu(display("aws.region required when AWS authentication is in use"))]
5254
AwsRegionRequired,
5355
}
@@ -116,6 +118,7 @@ pub struct RemoteWriteConfig {
116118
#[configurable(derived)]
117119
pub auth: Option<PrometheusRemoteWriteAuth>,
118120

121+
#[cfg(feature = "aws-core")]
119122
#[configurable(derived)]
120123
#[configurable(metadata(docs::advanced))]
121124
pub aws: Option<RegionOrEndpoint>,
@@ -194,6 +197,7 @@ impl SinkConfig for RemoteWriteConfig {
194197
None,
195198
None,
196199
),
200+
#[cfg(feature = "aws-core")]
197201
Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
198202
let region = self
199203
.aws
@@ -406,11 +410,14 @@ impl HttpRequestBuilder {
406410
builder = builder.header("X-Scope-OrgID", tenant_id);
407411
}
408412

409-
let mut request = builder.body(body.into()).unwrap();
413+
let request = builder.body(body.into()).unwrap();
414+
#[cfg(feature = "aws-core")]
415+
let mut request = request;
410416
if let Some(http_auth) = &self.http_auth {
411417
http_auth.apply(&mut request);
412418
}
413419

420+
#[cfg(feature = "aws-core")]
414421
if let Some(credentials_provider) = &self.credentials_provider {
415422
sign_request(&mut request, credentials_provider, &self.aws_region).await?;
416423
}
@@ -440,6 +447,7 @@ fn compress_block(compression: Compression, data: Bytes) -> Vec<u8> {
440447
}
441448
}
442449

450+
#[cfg(feature = "aws-core")]
443451
async fn sign_request(
444452
request: &mut http::Request<Bytes>,
445453
credentials_provider: &SharedCredentialsProvider,

0 commit comments

Comments
 (0)