Skip to content

Commit c543d9d

Browse files
committed
feat: separate aws support in es & prometheus sink
1 parent 61c0ae8 commit c543d9d

File tree

11 files changed

+359
-304
lines changed

11 files changed

+359
-304
lines changed

Cargo.lock

+232-194
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+8-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ path = "src/config/loading/secret_backend_example.rs"
3535
test = false
3636
bench = false
3737

38+
# Debug symbols end up chewing up several GB of disk space, so better to just
39+
# disable them.
40+
[profile.dev]
41+
debug = false
42+
3843
# CI-based builds use full release optimization. See scripts/environment/release-flags.sh.
3944
# This results in roughly a 5% reduction in performance when compiling locally vs when
4045
# compiled via the CI pipeline.
@@ -696,7 +701,7 @@ sinks-datadog_events = []
696701
sinks-datadog_logs = []
697702
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
698703
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
699-
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
704+
sinks-elasticsearch = ["transforms-metric_to_log"]
700705
sinks-file = ["dep:async-compression"]
701706
sinks-gcp = ["dep:base64", "gcp"]
702707
sinks-greptimedb = ["dep:greptimedb-client"]
@@ -711,7 +716,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
711716
sinks-new_relic_logs = ["sinks-http"]
712717
sinks-new_relic = []
713718
sinks-papertrail = ["dep:syslog"]
714-
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
719+
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
715720
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
716721
sinks-redis = ["dep:redis"]
717722
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
@@ -818,7 +823,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"]
818823
datadog-metrics-integration-tests = ["sinks-datadog_metrics"]
819824
datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"]
820825
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
821-
es-integration-tests = ["sinks-elasticsearch"]
826+
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
822827
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
823828
fluent-integration-tests = ["docker", "sources-fluent"]
824829
gcp-cloud-storage-integration-tests = ["sinks-gcp"]

src/sinks/axiom.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use vector_config::configurable_component;
66
use crate::{
77
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
88
sinks::{
9-
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig},
9+
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchConfig},
1010
util::{http::RequestConfig, Compression},
1111
Healthcheck, VectorSink,
1212
},
@@ -95,7 +95,7 @@ impl SinkConfig for AxiomConfig {
9595
let elasticsearch_config = ElasticsearchConfig {
9696
endpoints: vec![self.build_endpoint()],
9797
compression: self.compression,
98-
auth: Some(ElasticsearchAuth::Basic {
98+
auth: Some(ElasticsearchAuthConfig::Basic {
9999
user: "axiom".to_string(),
100100
password: self.token.clone(),
101101
}),

src/sinks/elasticsearch/common.rs

+44-52
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::HashMap;
22

3+
#[cfg(feature = "aws-core")]
34
use aws_credential_types::provider::SharedCredentialsProvider;
5+
#[cfg(feature = "aws-core")]
46
use aws_types::region::Region;
57
use bytes::{Buf, Bytes};
68
use http::{Response, StatusCode, Uri};
@@ -15,11 +17,12 @@ use super::{
1517
InvalidHostSnafu, Request,
1618
};
1719
use crate::{
18-
http::{Auth, HttpClient, MaybeAuth},
20+
http::{HttpClient, MaybeAuth},
1921
sinks::{
2022
elasticsearch::{
21-
ElasticsearchAuth, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
23+
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
2224
},
25+
util::auth::Auth,
2326
util::{http::RequestConfig, TowerRequestConfig, UriSerde},
2427
HealthcheckError,
2528
},
@@ -31,12 +34,10 @@ use crate::{
3134
pub struct ElasticsearchCommon {
3235
pub base_url: String,
3336
pub bulk_uri: Uri,
34-
pub http_auth: Option<Auth>,
35-
pub aws_auth: Option<SharedCredentialsProvider>,
37+
pub auth: Option<Auth>,
3638
pub mode: ElasticsearchCommonMode,
3739
pub request_builder: ElasticsearchRequestBuilder,
3840
pub tls_settings: TlsSettings,
39-
pub region: Option<Region>,
4041
pub request: RequestConfig,
4142
pub query_params: HashMap<String, String>,
4243
pub metric_to_log: MetricToLog,
@@ -61,31 +62,35 @@ impl ElasticsearchCommon {
6162
.into());
6263
}
6364

64-
let authorization = match &config.auth {
65-
Some(ElasticsearchAuth::Basic { user, password }) => Some(Auth::Basic {
66-
user: user.clone(),
67-
password: password.clone(),
68-
}),
69-
_ => None,
70-
};
7165
let uri = endpoint.parse::<UriSerde>()?;
72-
let http_auth = authorization.choose_one(&uri.auth)?;
73-
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
74-
75-
let aws_auth = match &config.auth {
76-
Some(ElasticsearchAuth::Basic { .. }) | None => None,
77-
Some(ElasticsearchAuth::Aws(aws)) => {
66+
let auth = match &config.auth {
67+
Some(ElasticsearchAuthConfig::Basic { user, password }) => {
68+
let auth = Some(crate::http::Auth::Basic {
69+
user: user.clone(),
70+
password: password.clone(),
71+
});
72+
// basic auth must be some for now
73+
let auth = auth.choose_one(&uri.auth)?.unwrap();
74+
Some(Auth::Basic(auth))
75+
}
76+
#[cfg(feature = "aws-core")]
77+
Some(ElasticsearchAuthConfig::Aws(aws)) => {
7878
let region = config
7979
.aws
8080
.as_ref()
8181
.map(|config| config.region())
8282
.ok_or(ParseError::RegionRequired)?
8383
.ok_or(ParseError::RegionRequired)?;
84-
85-
Some(aws.credentials_provider(region).await?)
84+
Some(Auth::Aws {
85+
provider: aws.credentials_provider(region.clone()).await?,
86+
region,
87+
})
8688
}
89+
None => None,
8790
};
8891

92+
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
93+
8994
let mode = config.common_mode()?;
9095

9196
let tower_request = config
@@ -126,8 +131,6 @@ impl ElasticsearchCommon {
126131
metric_config.metric_tag_values,
127132
);
128133

129-
let region = config.aws.as_ref().and_then(|config| config.region());
130-
131134
let version = if let Some(version) = *version {
132135
version
133136
} else {
@@ -136,16 +139,7 @@ impl ElasticsearchCommon {
136139
ElasticsearchApiVersion::V7 => 7,
137140
ElasticsearchApiVersion::V8 => 8,
138141
ElasticsearchApiVersion::Auto => {
139-
match get_version(
140-
&base_url,
141-
&http_auth,
142-
&aws_auth,
143-
&region,
144-
&request,
145-
&tls_settings,
146-
proxy_config,
147-
)
148-
.await
142+
match get_version(&base_url, &auth, &request, &tls_settings, proxy_config).await
149143
{
150144
Ok(version) => {
151145
debug!(message = "Auto-detected Elasticsearch API version.", %version);
@@ -195,15 +189,13 @@ impl ElasticsearchCommon {
195189
};
196190

197191
Ok(Self {
198-
http_auth,
192+
auth,
199193
base_url,
200194
bulk_uri,
201-
aws_auth,
202195
mode,
203196
request_builder,
204197
query_params,
205198
request,
206-
region,
207199
tls_settings,
208200
metric_to_log,
209201
})
@@ -248,9 +240,7 @@ impl ElasticsearchCommon {
248240
pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
249241
match get(
250242
&self.base_url,
251-
&self.http_auth,
252-
&self.aws_auth,
253-
&self.region,
243+
&self.auth,
254244
&self.request,
255245
client,
256246
"/_cluster/health",
@@ -264,6 +254,7 @@ impl ElasticsearchCommon {
264254
}
265255
}
266256

257+
#[cfg(feature = "aws-core")]
267258
pub async fn sign_request(
268259
request: &mut http::Request<Bytes>,
269260
credentials_provider: &SharedCredentialsProvider,
@@ -274,9 +265,7 @@ pub async fn sign_request(
274265

275266
async fn get_version(
276267
base_url: &str,
277-
http_auth: &Option<Auth>,
278-
aws_auth: &Option<SharedCredentialsProvider>,
279-
region: &Option<Region>,
268+
auth: &Option<Auth>,
280269
request: &RequestConfig,
281270
tls_settings: &TlsSettings,
282271
proxy_config: &ProxyConfig,
@@ -291,7 +280,7 @@ async fn get_version(
291280
}
292281

293282
let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
294-
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
283+
let response = get(base_url, auth, request, client, "/")
295284
.await
296285
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
297286

@@ -314,28 +303,31 @@ async fn get_version(
314303

315304
async fn get(
316305
base_url: &str,
317-
http_auth: &Option<Auth>,
318-
aws_auth: &Option<SharedCredentialsProvider>,
319-
region: &Option<Region>,
306+
auth: &Option<Auth>,
320307
request: &RequestConfig,
321308
client: HttpClient,
322309
path: &str,
323310
) -> crate::Result<Response<Body>> {
324311
let mut builder = Request::get(format!("{}{}", base_url, path));
325312

326-
if let Some(authorization) = &http_auth {
327-
builder = authorization.apply_builder(builder);
328-
}
329-
330313
for (header, value) in &request.headers {
331314
builder = builder.header(&header[..], &value[..]);
332315
}
333-
334316
let mut request = builder.body(Bytes::new())?;
335317

336-
if let Some(credentials_provider) = aws_auth {
337-
sign_request(&mut request, credentials_provider, region).await?;
318+
if let Some(auth) = auth {
319+
match auth {
320+
Auth::Basic(http_auth) => {
321+
http_auth.apply(&mut request);
322+
}
323+
#[cfg(feature = "aws-core")]
324+
Auth::Aws { provider, region } => {
325+
let region = region.clone();
326+
sign_request(&mut request, provider, &Some(region)).await?;
327+
}
328+
}
338329
}
330+
339331
client
340332
.send(request.map(hyper::Body::from))
341333
.await

src/sinks/elasticsearch/config.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use std::{
66
use futures::{FutureExt, TryFutureExt};
77
use vector_config::configurable_component;
88

9+
#[cfg(feature = "aws-core")]
10+
use crate::aws::RegionOrEndpoint;
911
use crate::{
10-
aws::RegionOrEndpoint,
1112
codecs::Transformer,
1213
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
1314
event::{EventRef, LogEvent, Value},
@@ -19,7 +20,7 @@ use crate::{
1920
retry::ElasticsearchRetryLogic,
2021
service::{ElasticsearchService, HttpRequestBuilder},
2122
sink::ElasticsearchSink,
22-
ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchCommon,
23+
ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
2324
ElasticsearchCommonMode, ElasticsearchMode,
2425
},
2526
util::{
@@ -142,7 +143,7 @@ pub struct ElasticsearchConfig {
142143
pub request: RequestConfig,
143144

144145
#[configurable(derived)]
145-
pub auth: Option<ElasticsearchAuth>,
146+
pub auth: Option<ElasticsearchAuthConfig>,
146147

147148
/// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
148149
#[serde(default)]
@@ -153,6 +154,7 @@ pub struct ElasticsearchConfig {
153154

154155
#[serde(default)]
155156
#[configurable(derived)]
157+
#[cfg(feature = "aws-core")]
156158
pub aws: Option<RegionOrEndpoint>,
157159

158160
#[serde(default)]
@@ -215,6 +217,7 @@ impl Default for ElasticsearchConfig {
215217
request: Default::default(),
216218
auth: None,
217219
query: None,
220+
#[cfg(feature = "aws-core")]
218221
aws: None,
219222
tls: None,
220223
endpoint_health: None,

src/sinks/elasticsearch/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use snafu::Snafu;
2424
use vector_common::sensitive_string::SensitiveString;
2525
use vector_config::configurable_component;
2626

27+
#[cfg(feature = "aws-core")]
2728
use crate::aws::AwsAuthentication;
2829
use crate::{
2930
event::{EventRef, LogEvent},
@@ -36,7 +37,7 @@ use crate::{
3637
#[derive(Clone, Debug)]
3738
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
3839
#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))]
39-
pub enum ElasticsearchAuth {
40+
pub enum ElasticsearchAuthConfig {
4041
/// HTTP Basic Authentication.
4142
Basic {
4243
/// Basic authentication username.
@@ -50,6 +51,7 @@ pub enum ElasticsearchAuth {
5051
password: SensitiveString,
5152
},
5253

54+
#[cfg(feature = "aws-core")]
5355
/// Amazon OpenSearch Service-specific authentication.
5456
Aws(AwsAuthentication),
5557
}
@@ -210,6 +212,7 @@ pub enum ParseError {
210212
IndexTemplate { source: TemplateParseError },
211213
#[snafu(display("Batch action template parse error: {}", source))]
212214
BatchActionTemplate { source: TemplateParseError },
215+
#[cfg(feature = "aws-core")]
213216
#[snafu(display("aws.region required when AWS authentication is in use"))]
214217
RegionRequired,
215218
#[snafu(display("Endpoints option must be specified"))]

0 commit comments

Comments
 (0)