Skip to content

Commit af18849

Browse files
authored
fix(elasticsearch sink): Revert add api_version option (#15006)
Revert "enhancement(elasticsearch sink): Add `api_version` option (#14918)" This reverts commit 4e45127.
1 parent 4e45127 commit af18849

File tree

8 files changed

+87
-343
lines changed

8 files changed

+87
-343
lines changed

src/sinks/axiom.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
SinkContext,
1010
},
1111
sinks::{
12-
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig},
12+
elasticsearch::{ElasticsearchAuth, ElasticsearchConfig},
1313
util::{http::RequestConfig, Compression},
1414
Healthcheck, VectorSink,
1515
},
@@ -100,7 +100,6 @@ impl SinkConfig for AxiomConfig {
100100
query: Some(query),
101101
tls: self.tls.clone(),
102102
request,
103-
api_version: ElasticsearchApiVersion::V6,
104103
..Default::default()
105104
};
106105

src/sinks/elasticsearch/common.rs

+42-149
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@ use std::collections::HashMap;
22

33
use aws_types::credentials::SharedCredentialsProvider;
44
use aws_types::region::Region;
5-
use bytes::{Buf, Bytes};
6-
use http::{Response, StatusCode, Uri};
7-
use hyper::{body, Body};
8-
use serde::Deserialize;
5+
use bytes::Bytes;
6+
use http::{StatusCode, Uri};
97
use snafu::ResultExt;
10-
use vector_core::config::proxy::ProxyConfig;
118

129
use super::{
13-
request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder,
14-
InvalidHostSnafu, Request,
10+
request_builder::ElasticsearchRequestBuilder, ElasticsearchEncoder, InvalidHostSnafu, Request,
1511
};
1612
use crate::{
1713
http::{Auth, HttpClient, MaybeAuth},
@@ -42,12 +38,7 @@ pub struct ElasticsearchCommon {
4238
}
4339

4440
impl ElasticsearchCommon {
45-
pub async fn parse_config(
46-
config: &ElasticsearchConfig,
47-
endpoint: &str,
48-
proxy_config: &ProxyConfig,
49-
version: &mut Option<usize>,
50-
) -> crate::Result<Self> {
41+
pub async fn parse_config(config: &ElasticsearchConfig, endpoint: &str) -> crate::Result<Self> {
5142
// Test the configured host, but ignore the result
5243
let uri = format!("{}/_test", endpoint);
5344
let uri = uri
@@ -87,6 +78,16 @@ impl ElasticsearchCommon {
8778

8879
let mode = config.common_mode()?;
8980

81+
let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into());
82+
let request_builder = ElasticsearchRequestBuilder {
83+
compression: config.compression,
84+
encoder: ElasticsearchEncoder {
85+
transformer: config.encoding.clone(),
86+
doc_type,
87+
suppress_type_name: config.suppress_type_name,
88+
},
89+
};
90+
9091
let tower_request = config
9192
.request
9293
.tower
@@ -102,13 +103,11 @@ impl ElasticsearchCommon {
102103
query_params.insert("pipeline".into(), pipeline.into());
103104
}
104105

105-
let bulk_url = {
106-
let mut query = url::form_urlencoded::Serializer::new(String::new());
107-
for (p, v) in &query_params {
108-
query.append_pair(&p[..], &v[..]);
109-
}
110-
format!("{}/_bulk?{}", base_url, query.finish())
111-
};
106+
let mut query = url::form_urlencoded::Serializer::new(String::new());
107+
for (p, v) in &query_params {
108+
query.append_pair(&p[..], &v[..]);
109+
}
110+
let bulk_url = format!("{}/_bulk?{}", base_url, query.finish());
112111
let bulk_uri = bulk_url.parse::<Uri>().unwrap();
113112

114113
let tls_settings = TlsSettings::from_options(&config.tls)?;
@@ -123,46 +122,6 @@ impl ElasticsearchCommon {
123122

124123
let region = config.aws.as_ref().and_then(|config| config.region());
125124

126-
let version = if let Some(version) = *version {
127-
version
128-
} else {
129-
let ver = match config.api_version {
130-
ElasticsearchApiVersion::V6 => 6,
131-
ElasticsearchApiVersion::V7 => 7,
132-
ElasticsearchApiVersion::V8 => 8,
133-
ElasticsearchApiVersion::Auto => {
134-
get_version(
135-
&base_url,
136-
&http_auth,
137-
&aws_auth,
138-
&region,
139-
&request,
140-
&tls_settings,
141-
proxy_config,
142-
)
143-
.await?
144-
}
145-
};
146-
*version = Some(ver);
147-
ver
148-
};
149-
150-
let doc_type = config.doc_type.clone().unwrap_or_else(|| "_doc".into());
151-
let suppress_type_name = if let Some(suppress_type_name) = config.suppress_type_name {
152-
warn!(message = "DEPRECATION, use of deprecated option `suppress_type_name`. Please use `api_version` option instead.");
153-
suppress_type_name
154-
} else {
155-
version >= 7
156-
};
157-
let request_builder = ElasticsearchRequestBuilder {
158-
compression: config.compression,
159-
encoder: ElasticsearchEncoder {
160-
transformer: config.encoding.clone(),
161-
doc_type,
162-
suppress_type_name,
163-
},
164-
};
165-
166125
Ok(Self {
167126
http_auth,
168127
base_url,
@@ -179,17 +138,11 @@ impl ElasticsearchCommon {
179138
}
180139

181140
/// Parses endpoints into a vector of ElasticsearchCommons. The resulting vector is guaranteed to not be empty.
182-
pub async fn parse_many(
183-
config: &ElasticsearchConfig,
184-
proxy_config: &ProxyConfig,
185-
) -> crate::Result<Vec<Self>> {
186-
let mut version = None;
141+
pub async fn parse_many(config: &ElasticsearchConfig) -> crate::Result<Vec<Self>> {
187142
if let Some(endpoint) = config.endpoint.as_ref() {
188143
warn!(message = "DEPRECATION, use of deprecated option `endpoint`. Please use `endpoints` option instead.");
189144
if config.endpoints.is_empty() {
190-
Ok(vec![
191-
Self::parse_config(config, endpoint, proxy_config, &mut version).await?,
192-
])
145+
Ok(vec![Self::parse_config(config, endpoint).await?])
193146
} else {
194147
Err(ParseError::EndpointsExclusive.into())
195148
}
@@ -198,8 +151,7 @@ impl ElasticsearchCommon {
198151
} else {
199152
let mut commons = Vec::new();
200153
for endpoint in config.endpoints.iter() {
201-
commons
202-
.push(Self::parse_config(config, endpoint, proxy_config, &mut version).await?);
154+
commons.push(Self::parse_config(config, endpoint).await?);
203155
}
204156
Ok(commons)
205157
}
@@ -208,25 +160,30 @@ impl ElasticsearchCommon {
208160
/// Parses a single endpoint, else panics.
209161
#[cfg(test)]
210162
pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
211-
let mut commons =
212-
Self::parse_many(config, crate::config::SinkContext::new_test().proxy()).await?;
213-
assert_eq!(commons.len(), 1);
163+
let mut commons = Self::parse_many(config).await?;
164+
assert!(commons.len() == 1);
214165
Ok(commons.remove(0))
215166
}
216167

217168
pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
218-
match get(
219-
&self.base_url,
220-
&self.http_auth,
221-
&self.aws_auth,
222-
&self.region,
223-
&self.request,
224-
client,
225-
"/_cluster/health",
226-
)
227-
.await?
228-
.status()
229-
{
169+
let mut builder = Request::get(format!("{}/_cluster/health", self.base_url));
170+
171+
if let Some(authorization) = &self.http_auth {
172+
builder = authorization.apply_builder(builder);
173+
}
174+
175+
for (header, value) in &self.request.headers {
176+
builder = builder.header(&header[..], &value[..]);
177+
}
178+
179+
let mut request = builder.body(Bytes::new())?;
180+
181+
if let Some(credentials_provider) = &self.aws_auth {
182+
sign_request(&mut request, credentials_provider, &self.region).await?;
183+
}
184+
let response = client.send(request.map(hyper::Body::from)).await?;
185+
186+
match response.status() {
230187
StatusCode::OK => Ok(()),
231188
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
232189
}
@@ -240,67 +197,3 @@ pub async fn sign_request(
240197
) -> crate::Result<()> {
241198
crate::aws::sign_request("es", request, credentials_provider, region).await
242199
}
243-
244-
async fn get_version(
245-
base_url: &str,
246-
http_auth: &Option<Auth>,
247-
aws_auth: &Option<SharedCredentialsProvider>,
248-
region: &Option<Region>,
249-
request: &RequestConfig,
250-
tls_settings: &TlsSettings,
251-
proxy_config: &ProxyConfig,
252-
) -> crate::Result<usize> {
253-
#[derive(Deserialize)]
254-
struct ClusterState {
255-
version: usize,
256-
}
257-
258-
let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
259-
let response = get(
260-
base_url,
261-
http_auth,
262-
aws_auth,
263-
region,
264-
request,
265-
client,
266-
"/_cluster/state/version",
267-
)
268-
.await
269-
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
270-
271-
let (_, body) = response.into_parts();
272-
let mut body = body::aggregate(body).await?;
273-
let body = body.copy_to_bytes(body.remaining());
274-
let ClusterState { version } = serde_json::from_slice(&body)?;
275-
Ok(version)
276-
}
277-
278-
async fn get(
279-
base_url: &str,
280-
http_auth: &Option<Auth>,
281-
aws_auth: &Option<SharedCredentialsProvider>,
282-
region: &Option<Region>,
283-
request: &RequestConfig,
284-
client: HttpClient,
285-
path: &str,
286-
) -> crate::Result<Response<Body>> {
287-
let mut builder = Request::get(format!("{}{}", base_url, path));
288-
289-
if let Some(authorization) = &http_auth {
290-
builder = authorization.apply_builder(builder);
291-
}
292-
293-
for (header, value) in &request.headers {
294-
builder = builder.header(&header[..], &value[..]);
295-
}
296-
297-
let mut request = builder.body(Bytes::new())?;
298-
299-
if let Some(credentials_provider) = aws_auth {
300-
sign_request(&mut request, credentials_provider, region).await?;
301-
}
302-
client
303-
.send(request.map(hyper::Body::from))
304-
.await
305-
.map_err(Into::into)
306-
}

src/sinks/elasticsearch/config.rs

+5-35
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use crate::{
2020
retry::ElasticsearchRetryLogic,
2121
service::{ElasticsearchService, HttpRequestBuilder},
2222
sink::ElasticsearchSink,
23-
BatchActionTemplateSnafu, ElasticsearchApiVersion, ElasticsearchAuth,
24-
ElasticsearchCommon, ElasticsearchCommonMode, ElasticsearchMode, IndexTemplateSnafu,
23+
BatchActionTemplateSnafu, ElasticsearchAuth, ElasticsearchCommon,
24+
ElasticsearchCommonMode, ElasticsearchMode, IndexTemplateSnafu,
2525
},
2626
util::{
2727
http::RequestConfig, service::HealthConfig, BatchConfig, Compression,
@@ -61,19 +61,13 @@ pub struct ElasticsearchConfig {
6161
/// set this option since Elasticsearch has removed it.
6262
pub doc_type: Option<String>,
6363

64-
/// The API version of Elasticsearch.
65-
#[serde(default)]
66-
pub api_version: ElasticsearchApiVersion,
67-
6864
/// Whether or not to send the `type` field to Elasticsearch.
6965
///
7066
/// `type` field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x.
7167
///
7268
/// If enabled, the `doc_type` option will be ignored.
73-
///
74-
/// This option has been deprecated, the `api_version` option should be used instead.
75-
#[configurable(deprecated)]
76-
pub suppress_type_name: Option<bool>,
69+
#[serde(default)]
70+
pub suppress_type_name: bool,
7771

7872
/// The name of the event key that should map to Elasticsearch’s [`_id` field][es_id].
7973
///
@@ -381,7 +375,7 @@ impl DataStreamConfig {
381375
#[async_trait::async_trait]
382376
impl SinkConfig for ElasticsearchConfig {
383377
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
384-
let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
378+
let commons = ElasticsearchCommon::parse_many(self).await?;
385379
let common = commons[0].clone();
386380

387381
let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
@@ -489,28 +483,4 @@ mod tests {
489483
)
490484
.unwrap();
491485
}
492-
493-
#[test]
494-
fn parse_version() {
495-
let config = toml::from_str::<ElasticsearchConfig>(
496-
r#"
497-
endpoints = [""]
498-
api_version = "v7"
499-
"#,
500-
)
501-
.unwrap();
502-
assert_eq!(config.api_version, ElasticsearchApiVersion::V7);
503-
}
504-
505-
#[test]
506-
fn parse_version_auto() {
507-
let config = toml::from_str::<ElasticsearchConfig>(
508-
r#"
509-
endpoints = [""]
510-
api_version = "auto"
511-
"#,
512-
)
513-
.unwrap();
514-
assert_eq!(config.api_version, ElasticsearchApiVersion::Auto);
515-
}
516486
}

0 commit comments

Comments
 (0)