1
1
use std:: collections:: HashMap ;
2
2
3
+ #[ cfg( feature = "aws-core" ) ]
3
4
use aws_credential_types:: provider:: SharedCredentialsProvider ;
5
+ #[ cfg( feature = "aws-core" ) ]
4
6
use aws_types:: region:: Region ;
5
7
use bytes:: { Buf , Bytes } ;
6
8
use http:: { Response , StatusCode , Uri } ;
@@ -15,11 +17,12 @@ use super::{
15
17
InvalidHostSnafu , Request ,
16
18
} ;
17
19
use crate :: {
18
- http:: { Auth , HttpClient , MaybeAuth } ,
20
+ http:: { HttpClient , MaybeAuth } ,
19
21
sinks:: {
20
22
elasticsearch:: {
21
- ElasticsearchAuth , ElasticsearchCommonMode , ElasticsearchConfig , ParseError ,
23
+ ElasticsearchAuthConfig , ElasticsearchCommonMode , ElasticsearchConfig , ParseError ,
22
24
} ,
25
+ util:: auth:: Auth ,
23
26
util:: { http:: RequestConfig , TowerRequestConfig , UriSerde } ,
24
27
HealthcheckError ,
25
28
} ,
@@ -31,12 +34,10 @@ use crate::{
31
34
pub struct ElasticsearchCommon {
32
35
pub base_url : String ,
33
36
pub bulk_uri : Uri ,
34
- pub http_auth : Option < Auth > ,
35
- pub aws_auth : Option < SharedCredentialsProvider > ,
37
+ pub auth : Option < Auth > ,
36
38
pub mode : ElasticsearchCommonMode ,
37
39
pub request_builder : ElasticsearchRequestBuilder ,
38
40
pub tls_settings : TlsSettings ,
39
- pub region : Option < Region > ,
40
41
pub request : RequestConfig ,
41
42
pub query_params : HashMap < String , String > ,
42
43
pub metric_to_log : MetricToLog ,
@@ -61,31 +62,35 @@ impl ElasticsearchCommon {
61
62
. into ( ) ) ;
62
63
}
63
64
64
- let authorization = match & config. auth {
65
- Some ( ElasticsearchAuth :: Basic { user, password } ) => Some ( Auth :: Basic {
66
- user : user. clone ( ) ,
67
- password : password. clone ( ) ,
68
- } ) ,
69
- _ => None ,
70
- } ;
71
65
let uri = endpoint. parse :: < UriSerde > ( ) ?;
72
- let http_auth = authorization. choose_one ( & uri. auth ) ?;
73
- let base_url = uri. uri . to_string ( ) . trim_end_matches ( '/' ) . to_owned ( ) ;
74
-
75
- let aws_auth = match & config. auth {
76
- Some ( ElasticsearchAuth :: Basic { .. } ) | None => None ,
77
- Some ( ElasticsearchAuth :: Aws ( aws) ) => {
66
+ let auth = match & config. auth {
67
+ Some ( ElasticsearchAuthConfig :: Basic { user, password } ) => {
68
+ let auth = Some ( crate :: http:: Auth :: Basic {
69
+ user : user. clone ( ) ,
70
+ password : password. clone ( ) ,
71
+ } ) ;
72
+ // basic auth must be some for now
73
+ let auth = auth. choose_one ( & uri. auth ) ?. unwrap ( ) ;
74
+ Some ( Auth :: Basic ( auth) )
75
+ }
76
+ #[ cfg( feature = "aws-core" ) ]
77
+ Some ( ElasticsearchAuthConfig :: Aws ( aws) ) => {
78
78
let region = config
79
79
. aws
80
80
. as_ref ( )
81
81
. map ( |config| config. region ( ) )
82
82
. ok_or ( ParseError :: RegionRequired ) ?
83
83
. ok_or ( ParseError :: RegionRequired ) ?;
84
-
85
- Some ( aws. credentials_provider ( region) . await ?)
84
+ Some ( Auth :: Aws {
85
+ credentials_provider : aws. credentials_provider ( region. clone ( ) ) . await ?,
86
+ region,
87
+ } )
86
88
}
89
+ None => None ,
87
90
} ;
88
91
92
+ let base_url = uri. uri . to_string ( ) . trim_end_matches ( '/' ) . to_owned ( ) ;
93
+
89
94
let mode = config. common_mode ( ) ?;
90
95
91
96
let tower_request = config
@@ -126,8 +131,6 @@ impl ElasticsearchCommon {
126
131
metric_config. metric_tag_values ,
127
132
) ;
128
133
129
- let region = config. aws . as_ref ( ) . and_then ( |config| config. region ( ) ) ;
130
-
131
134
let version = if let Some ( version) = * version {
132
135
version
133
136
} else {
@@ -136,16 +139,7 @@ impl ElasticsearchCommon {
136
139
ElasticsearchApiVersion :: V7 => 7 ,
137
140
ElasticsearchApiVersion :: V8 => 8 ,
138
141
ElasticsearchApiVersion :: Auto => {
139
- match get_version (
140
- & base_url,
141
- & http_auth,
142
- & aws_auth,
143
- & region,
144
- & request,
145
- & tls_settings,
146
- proxy_config,
147
- )
148
- . await
142
+ match get_version ( & base_url, & auth, & request, & tls_settings, proxy_config) . await
149
143
{
150
144
Ok ( version) => {
151
145
debug ! ( message = "Auto-detected Elasticsearch API version." , %version) ;
@@ -195,15 +189,13 @@ impl ElasticsearchCommon {
195
189
} ;
196
190
197
191
Ok ( Self {
198
- http_auth ,
192
+ auth ,
199
193
base_url,
200
194
bulk_uri,
201
- aws_auth,
202
195
mode,
203
196
request_builder,
204
197
query_params,
205
198
request,
206
- region,
207
199
tls_settings,
208
200
metric_to_log,
209
201
} )
@@ -248,9 +240,7 @@ impl ElasticsearchCommon {
248
240
pub async fn healthcheck ( self , client : HttpClient ) -> crate :: Result < ( ) > {
249
241
match get (
250
242
& self . base_url ,
251
- & self . http_auth ,
252
- & self . aws_auth ,
253
- & self . region ,
243
+ & self . auth ,
254
244
& self . request ,
255
245
client,
256
246
"/_cluster/health" ,
@@ -264,6 +254,7 @@ impl ElasticsearchCommon {
264
254
}
265
255
}
266
256
257
+ #[ cfg( feature = "aws-core" ) ]
267
258
pub async fn sign_request (
268
259
request : & mut http:: Request < Bytes > ,
269
260
credentials_provider : & SharedCredentialsProvider ,
@@ -274,9 +265,7 @@ pub async fn sign_request(
274
265
275
266
async fn get_version (
276
267
base_url : & str ,
277
- http_auth : & Option < Auth > ,
278
- aws_auth : & Option < SharedCredentialsProvider > ,
279
- region : & Option < Region > ,
268
+ auth : & Option < Auth > ,
280
269
request : & RequestConfig ,
281
270
tls_settings : & TlsSettings ,
282
271
proxy_config : & ProxyConfig ,
@@ -291,7 +280,7 @@ async fn get_version(
291
280
}
292
281
293
282
let client = HttpClient :: new ( tls_settings. clone ( ) , proxy_config) ?;
294
- let response = get ( base_url, http_auth , aws_auth , region , request, client, "/" )
283
+ let response = get ( base_url, auth , request, client, "/" )
295
284
. await
296
285
. map_err ( |error| format ! ( "Failed to get Elasticsearch API version: {}" , error) ) ?;
297
286
@@ -314,28 +303,31 @@ async fn get_version(
314
303
315
304
async fn get (
316
305
base_url : & str ,
317
- http_auth : & Option < Auth > ,
318
- aws_auth : & Option < SharedCredentialsProvider > ,
319
- region : & Option < Region > ,
306
+ auth : & Option < Auth > ,
320
307
request : & RequestConfig ,
321
308
client : HttpClient ,
322
309
path : & str ,
323
310
) -> crate :: Result < Response < Body > > {
324
311
let mut builder = Request :: get ( format ! ( "{}{}" , base_url, path) ) ;
325
312
326
- if let Some ( authorization) = & http_auth {
327
- builder = authorization. apply_builder ( builder) ;
328
- }
329
-
330
313
for ( header, value) in & request. headers {
331
314
builder = builder. header ( & header[ ..] , & value[ ..] ) ;
332
315
}
333
-
334
316
let mut request = builder. body ( Bytes :: new ( ) ) ?;
335
317
336
- if let Some ( credentials_provider) = aws_auth {
337
- sign_request ( & mut request, credentials_provider, region) . await ?;
318
+ if let Some ( auth) = auth {
319
+ match auth {
320
+ Auth :: Basic ( http_auth) => {
321
+ http_auth. apply ( & mut request) ;
322
+ }
323
+ #[ cfg( feature = "aws-core" ) ]
324
+ Auth :: Aws { credentials_provider : provider, region } => {
325
+ let region = region. clone ( ) ;
326
+ sign_request ( & mut request, provider, & Some ( region) ) . await ?;
327
+ }
328
+ }
338
329
}
330
+
339
331
client
340
332
. send ( request. map ( hyper:: Body :: from) )
341
333
. await
0 commit comments