Skip to content

Commit e652ea4

Browse files
authored
enhancement(es sink): separate aws support in es & prometheus sink (#18288)
* feat: separate aws support in es & prometheus sink * remove redundant aws-core feature * modify auth aws feature * format code * fix clippy
1 parent 749594c commit e652ea4

File tree

12 files changed

+169
-151
lines changed

12 files changed

+169
-151
lines changed

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,7 @@ sinks-datadog_events = []
706706
sinks-datadog_logs = []
707707
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
708708
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
709-
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
709+
sinks-elasticsearch = ["transforms-metric_to_log"]
710710
sinks-file = ["dep:async-compression"]
711711
sinks-gcp = ["dep:base64", "gcp"]
712712
sinks-greptimedb = ["dep:greptimedb-client"]
@@ -721,7 +721,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
721721
sinks-new_relic_logs = ["sinks-http"]
722722
sinks-new_relic = []
723723
sinks-papertrail = ["dep:syslog"]
724-
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
724+
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
725725
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
726726
sinks-redis = ["dep:redis"]
727727
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
@@ -828,7 +828,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"]
828828
datadog-metrics-integration-tests = ["sinks-datadog_metrics"]
829829
datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"]
830830
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
831-
es-integration-tests = ["sinks-elasticsearch"]
831+
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
832832
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
833833
fluent-integration-tests = ["docker", "sources-fluent"]
834834
gcp-cloud-storage-integration-tests = ["sinks-gcp"]

src/sinks/aws_kinesis/firehose/integration_tests.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use crate::{
1313
aws::{create_client, AwsAuthentication, ImdsAuthentication, RegionOrEndpoint},
1414
config::{ProxyConfig, SinkConfig, SinkContext},
1515
sinks::{
16-
elasticsearch::{BulkConfig, ElasticsearchAuth, ElasticsearchCommon, ElasticsearchConfig},
16+
elasticsearch::{
17+
BulkConfig, ElasticsearchAuthConfig, ElasticsearchCommon, ElasticsearchConfig,
18+
},
1719
util::{BatchConfig, Compression, TowerRequestConfig},
1820
},
1921
template::Template,
@@ -73,7 +75,7 @@ async fn firehose_put_records() {
7375
sleep(Duration::from_secs(5)).await;
7476

7577
let config = ElasticsearchConfig {
76-
auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default {
78+
auth: Some(ElasticsearchAuthConfig::Aws(AwsAuthentication::Default {
7779
load_timeout_secs: Some(5),
7880
imds: ImdsAuthentication::default(),
7981
region: None,

src/sinks/axiom.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use vector_config::configurable_component;
66
use crate::{
77
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
88
sinks::{
9-
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig},
9+
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchConfig},
1010
util::{http::RequestConfig, Compression},
1111
Healthcheck, VectorSink,
1212
},
@@ -95,7 +95,7 @@ impl SinkConfig for AxiomConfig {
9595
let elasticsearch_config = ElasticsearchConfig {
9696
endpoints: vec![self.build_endpoint()],
9797
compression: self.compression,
98-
auth: Some(ElasticsearchAuth::Basic {
98+
auth: Some(ElasticsearchAuthConfig::Basic {
9999
user: "axiom".to_string(),
100100
password: self.token.clone(),
101101
}),

src/sinks/elasticsearch/common.rs

+47-56
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use std::collections::HashMap;
22

3-
use aws_credential_types::provider::SharedCredentialsProvider;
4-
use aws_types::region::Region;
53
use bytes::{Buf, Bytes};
64
use http::{Response, StatusCode, Uri};
75
use hyper::{body, Body};
@@ -15,11 +13,12 @@ use super::{
1513
InvalidHostSnafu, Request,
1614
};
1715
use crate::{
18-
http::{Auth, HttpClient, MaybeAuth},
16+
http::{HttpClient, MaybeAuth},
1917
sinks::{
2018
elasticsearch::{
21-
ElasticsearchAuth, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
19+
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
2220
},
21+
util::auth::Auth,
2322
util::{http::RequestConfig, TowerRequestConfig, UriSerde},
2423
HealthcheckError,
2524
},
@@ -31,12 +30,10 @@ use crate::{
3130
pub struct ElasticsearchCommon {
3231
pub base_url: String,
3332
pub bulk_uri: Uri,
34-
pub http_auth: Option<Auth>,
35-
pub aws_auth: Option<SharedCredentialsProvider>,
33+
pub auth: Option<Auth>,
3634
pub mode: ElasticsearchCommonMode,
3735
pub request_builder: ElasticsearchRequestBuilder,
3836
pub tls_settings: TlsSettings,
39-
pub region: Option<Region>,
4037
pub request: RequestConfig,
4138
pub query_params: HashMap<String, String>,
4239
pub metric_to_log: MetricToLog,
@@ -61,31 +58,35 @@ impl ElasticsearchCommon {
6158
.into());
6259
}
6360

64-
let authorization = match &config.auth {
65-
Some(ElasticsearchAuth::Basic { user, password }) => Some(Auth::Basic {
66-
user: user.clone(),
67-
password: password.clone(),
68-
}),
69-
_ => None,
70-
};
7161
let uri = endpoint.parse::<UriSerde>()?;
72-
let http_auth = authorization.choose_one(&uri.auth)?;
73-
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
74-
75-
let aws_auth = match &config.auth {
76-
Some(ElasticsearchAuth::Basic { .. }) | None => None,
77-
Some(ElasticsearchAuth::Aws(aws)) => {
62+
let auth = match &config.auth {
63+
Some(ElasticsearchAuthConfig::Basic { user, password }) => {
64+
let auth = Some(crate::http::Auth::Basic {
65+
user: user.clone(),
66+
password: password.clone(),
67+
});
68+
// basic auth must be some for now
69+
let auth = auth.choose_one(&uri.auth)?.unwrap();
70+
Some(Auth::Basic(auth))
71+
}
72+
#[cfg(feature = "aws-core")]
73+
Some(ElasticsearchAuthConfig::Aws(aws)) => {
7874
let region = config
7975
.aws
8076
.as_ref()
8177
.map(|config| config.region())
8278
.ok_or(ParseError::RegionRequired)?
8379
.ok_or(ParseError::RegionRequired)?;
84-
85-
Some(aws.credentials_provider(region).await?)
80+
Some(Auth::Aws {
81+
credentials_provider: aws.credentials_provider(region.clone()).await?,
82+
region,
83+
})
8684
}
85+
None => None,
8786
};
8887

88+
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();
89+
8990
let mode = config.common_mode()?;
9091

9192
let tower_request = config
@@ -126,8 +127,6 @@ impl ElasticsearchCommon {
126127
metric_config.metric_tag_values,
127128
);
128129

129-
let region = config.aws.as_ref().and_then(|config| config.region());
130-
131130
let version = if let Some(version) = *version {
132131
version
133132
} else {
@@ -136,16 +135,7 @@ impl ElasticsearchCommon {
136135
ElasticsearchApiVersion::V7 => 7,
137136
ElasticsearchApiVersion::V8 => 8,
138137
ElasticsearchApiVersion::Auto => {
139-
match get_version(
140-
&base_url,
141-
&http_auth,
142-
&aws_auth,
143-
&region,
144-
&request,
145-
&tls_settings,
146-
proxy_config,
147-
)
148-
.await
138+
match get_version(&base_url, &auth, &request, &tls_settings, proxy_config).await
149139
{
150140
Ok(version) => {
151141
debug!(message = "Auto-detected Elasticsearch API version.", %version);
@@ -195,15 +185,13 @@ impl ElasticsearchCommon {
195185
};
196186

197187
Ok(Self {
198-
http_auth,
188+
auth,
199189
base_url,
200190
bulk_uri,
201-
aws_auth,
202191
mode,
203192
request_builder,
204193
query_params,
205194
request,
206-
region,
207195
tls_settings,
208196
metric_to_log,
209197
})
@@ -248,9 +236,7 @@ impl ElasticsearchCommon {
248236
pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
249237
match get(
250238
&self.base_url,
251-
&self.http_auth,
252-
&self.aws_auth,
253-
&self.region,
239+
&self.auth,
254240
&self.request,
255241
client,
256242
"/_cluster/health",
@@ -264,19 +250,18 @@ impl ElasticsearchCommon {
264250
}
265251
}
266252

253+
#[cfg(feature = "aws-core")]
267254
pub async fn sign_request(
268255
request: &mut http::Request<Bytes>,
269-
credentials_provider: &SharedCredentialsProvider,
270-
region: &Option<Region>,
256+
credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
257+
region: &Option<aws_types::region::Region>,
271258
) -> crate::Result<()> {
272259
crate::aws::sign_request("es", request, credentials_provider, region).await
273260
}
274261

275262
async fn get_version(
276263
base_url: &str,
277-
http_auth: &Option<Auth>,
278-
aws_auth: &Option<SharedCredentialsProvider>,
279-
region: &Option<Region>,
264+
auth: &Option<Auth>,
280265
request: &RequestConfig,
281266
tls_settings: &TlsSettings,
282267
proxy_config: &ProxyConfig,
@@ -291,7 +276,7 @@ async fn get_version(
291276
}
292277

293278
let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
294-
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
279+
let response = get(base_url, auth, request, client, "/")
295280
.await
296281
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
297282

@@ -314,28 +299,34 @@ async fn get_version(
314299

315300
async fn get(
316301
base_url: &str,
317-
http_auth: &Option<Auth>,
318-
aws_auth: &Option<SharedCredentialsProvider>,
319-
region: &Option<Region>,
302+
auth: &Option<Auth>,
320303
request: &RequestConfig,
321304
client: HttpClient,
322305
path: &str,
323306
) -> crate::Result<Response<Body>> {
324307
let mut builder = Request::get(format!("{}{}", base_url, path));
325308

326-
if let Some(authorization) = &http_auth {
327-
builder = authorization.apply_builder(builder);
328-
}
329-
330309
for (header, value) in &request.headers {
331310
builder = builder.header(&header[..], &value[..]);
332311
}
333-
334312
let mut request = builder.body(Bytes::new())?;
335313

336-
if let Some(credentials_provider) = aws_auth {
337-
sign_request(&mut request, credentials_provider, region).await?;
314+
if let Some(auth) = auth {
315+
match auth {
316+
Auth::Basic(http_auth) => {
317+
http_auth.apply(&mut request);
318+
}
319+
#[cfg(feature = "aws-core")]
320+
Auth::Aws {
321+
credentials_provider: provider,
322+
region,
323+
} => {
324+
let region = region.clone();
325+
sign_request(&mut request, provider, &Some(region)).await?;
326+
}
327+
}
338328
}
329+
339330
client
340331
.send(request.map(hyper::Body::from))
341332
.await

src/sinks/elasticsearch/config.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use futures::{FutureExt, TryFutureExt};
77
use vector_config::configurable_component;
88

99
use crate::{
10-
aws::RegionOrEndpoint,
1110
codecs::Transformer,
1211
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
1312
event::{EventRef, LogEvent, Value},
@@ -19,7 +18,7 @@ use crate::{
1918
retry::ElasticsearchRetryLogic,
2019
service::{ElasticsearchService, HttpRequestBuilder},
2120
sink::ElasticsearchSink,
22-
ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchCommon,
21+
ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
2322
ElasticsearchCommonMode, ElasticsearchMode,
2423
},
2524
util::{
@@ -142,7 +141,7 @@ pub struct ElasticsearchConfig {
142141
pub request: RequestConfig,
143142

144143
#[configurable(derived)]
145-
pub auth: Option<ElasticsearchAuth>,
144+
pub auth: Option<ElasticsearchAuthConfig>,
146145

147146
/// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
148147
#[serde(default)]
@@ -153,7 +152,8 @@ pub struct ElasticsearchConfig {
153152

154153
#[serde(default)]
155154
#[configurable(derived)]
156-
pub aws: Option<RegionOrEndpoint>,
155+
#[cfg(feature = "aws-core")]
156+
pub aws: Option<crate::aws::RegionOrEndpoint>,
157157

158158
#[serde(default)]
159159
#[configurable(derived)]
@@ -215,6 +215,7 @@ impl Default for ElasticsearchConfig {
215215
request: Default::default(),
216216
auth: None,
217217
query: None,
218+
#[cfg(feature = "aws-core")]
218219
aws: None,
219220
tls: None,
220221
endpoint_health: None,

0 commit comments

Comments
 (0)