@@ -13,6 +13,7 @@ use flate2::read::MultiGzDecoder;
13
13
use futures:: FutureExt ;
14
14
use http:: StatusCode ;
15
15
use hyper:: { service:: make_service_fn, Server } ;
16
+ use serde:: de:: DeserializeOwned ;
16
17
use serde:: Serialize ;
17
18
use serde_json:: {
18
19
de:: { Read as JsonRead , StrRead } ,
@@ -35,6 +36,7 @@ use vector_lib::{
35
36
use vector_lib:: { configurable:: configurable_component, tls:: MaybeTlsIncomingStream } ;
36
37
use vrl:: path:: OwnedTargetPath ;
37
38
use vrl:: value:: { kind:: Collection , Kind } ;
39
+ use warp:: http:: header:: { HeaderValue , CONTENT_TYPE } ;
38
40
use warp:: { filters:: BoxedFilter , path, reject:: Rejection , reply:: Response , Filter , Reply } ;
39
41
40
42
use self :: {
@@ -529,26 +531,50 @@ impl SplunkSource {
529
531
. boxed ( )
530
532
}
531
533
534
+ fn lenient_json_content_type_check < T > ( ) -> impl Filter < Extract = ( T , ) , Error = Rejection > + Clone
535
+ where
536
+ T : Send + DeserializeOwned + ' static ,
537
+ {
538
+ warp:: header:: optional :: < HeaderValue > ( CONTENT_TYPE . as_str ( ) )
539
+ . and ( warp:: body:: bytes ( ) )
540
+ . and_then (
541
+ |ctype : Option < HeaderValue > , body : bytes:: Bytes | async move {
542
+ let ok = ctype
543
+ . as_ref ( )
544
+ . and_then ( |v| v. to_str ( ) . ok ( ) )
545
+ . map ( |h| h. to_ascii_lowercase ( ) . contains ( "application/json" ) )
546
+ . unwrap_or ( true ) ;
547
+
548
+ if !ok {
549
+ return Err ( warp:: reject:: custom ( ApiError :: UnsupportedContentType ) ) ;
550
+ }
551
+
552
+ let value = serde_json:: from_slice :: < T > ( & body)
553
+ . map_err ( |_| warp:: reject:: custom ( ApiError :: BadRequest ) ) ?;
554
+
555
+ Ok ( value)
556
+ } ,
557
+ )
558
+ }
559
+
532
560
fn ack_service ( & self ) -> BoxedFilter < ( Response , ) > {
533
561
let idx_ack = self . idx_ack . clone ( ) ;
562
+
534
563
warp:: post ( )
535
- . and ( path ! ( "ack" ) )
564
+ . and ( warp :: path!( "ack" ) )
536
565
. and ( self . authorization ( ) )
537
566
. and ( SplunkSource :: required_channel ( ) )
538
- . and ( warp :: body :: json ( ) )
539
- . and_then ( move |_, channel_id : String , body : HecAckStatusRequest | {
567
+ . and ( Self :: lenient_json_content_type_check :: < HecAckStatusRequest > ( ) )
568
+ . and_then ( move |_, channel : String , req : HecAckStatusRequest | {
540
569
let idx_ack = idx_ack. clone ( ) ;
541
570
async move {
542
571
if let Some ( idx_ack) = idx_ack {
543
- let ack_statuses = idx_ack
544
- . get_acks_status_from_channel ( channel_id , & body . acks )
572
+ let acks = idx_ack
573
+ . get_acks_status_from_channel ( channel , & req . acks )
545
574
. await ?;
546
- Ok (
547
- warp:: reply:: json ( & HecAckStatusResponse { acks : ack_statuses } )
548
- . into_response ( ) ,
549
- )
575
+ Ok ( warp:: reply:: json ( & HecAckStatusResponse { acks } ) . into_response ( ) )
550
576
} else {
551
- Err ( Rejection :: from ( ApiError :: AckIsDisabled ) )
577
+ Err ( warp :: reject :: custom ( ApiError :: AckIsDisabled ) )
552
578
}
553
579
}
554
580
} )
@@ -1094,6 +1120,7 @@ pub(crate) enum ApiError {
1094
1120
MissingAuthorization ,
1095
1121
InvalidAuthorization ,
1096
1122
UnsupportedEncoding ,
1123
+ UnsupportedContentType ,
1097
1124
MissingChannel ,
1098
1125
NoData ,
1099
1126
InvalidDataFormat { event : usize } ,
@@ -1188,6 +1215,14 @@ fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1188
1215
response_json ( StatusCode :: OK , body)
1189
1216
}
1190
1217
1218
+ fn response_plain ( code : StatusCode , msg : & ' static str ) -> Response {
1219
+ warp:: reply:: with_status (
1220
+ warp:: reply:: with_header ( msg, http:: header:: CONTENT_TYPE , "text/plain; charset=utf-8" ) ,
1221
+ code,
1222
+ )
1223
+ . into_response ( )
1224
+ }
1225
+
1191
1226
async fn finish_err ( rejection : Rejection ) -> Result < ( Response , ) , Rejection > {
1192
1227
if let Some ( & error) = rejection. find :: < ApiError > ( ) {
1193
1228
emit ! ( SplunkHecRequestError { error } ) ;
@@ -1200,6 +1235,10 @@ async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1200
1235
splunk_response:: INVALID_AUTHORIZATION ,
1201
1236
) ,
1202
1237
ApiError :: UnsupportedEncoding => empty_response ( StatusCode :: UNSUPPORTED_MEDIA_TYPE ) ,
1238
+ ApiError :: UnsupportedContentType => response_plain (
1239
+ StatusCode :: UNSUPPORTED_MEDIA_TYPE ,
1240
+ "The request's content-type is not supported" ,
1241
+ ) ,
1203
1242
ApiError :: MissingChannel => {
1204
1243
response_json ( StatusCode :: BAD_REQUEST , splunk_response:: NO_CHANNEL )
1205
1244
}
@@ -2485,6 +2524,51 @@ mod tests {
2485
2524
assert ! ( ack_res. acks. get( & event_res. ack_id) . unwrap( ) ) ;
2486
2525
}
2487
2526
2527
+ #[ tokio:: test]
2528
+ async fn ack_service_accepts_parameterized_content_type ( ) {
2529
+ let ack_config = HecAcknowledgementsConfig {
2530
+ enabled : Some ( true ) ,
2531
+ ..Default :: default ( )
2532
+ } ;
2533
+ let ( source, address) = source ( Some ( ack_config) ) . await ;
2534
+ let opts = SendWithOpts {
2535
+ channel : Some ( Channel :: Header ( "guid" ) ) ,
2536
+ forwarded_for : None ,
2537
+ } ;
2538
+
2539
+ let event_res = send_with_response (
2540
+ address,
2541
+ "services/collector/event" ,
2542
+ r#"{"event":"param-test"}"# ,
2543
+ TOKEN ,
2544
+ & opts,
2545
+ )
2546
+ . await
2547
+ . json :: < HecAckEventResponse > ( )
2548
+ . await
2549
+ . unwrap ( ) ;
2550
+ let _ = collect_n ( source, 1 ) . await ;
2551
+
2552
+ let body = serde_json:: to_string ( & HecAckStatusRequest {
2553
+ acks : vec ! [ event_res. ack_id] ,
2554
+ } )
2555
+ . unwrap ( ) ;
2556
+
2557
+ let res = reqwest:: Client :: new ( )
2558
+ . post ( format ! ( "http://{}/services/collector/ack" , address) )
2559
+ . header ( "Authorization" , format ! ( "Splunk {}" , TOKEN ) )
2560
+ . header ( "x-splunk-request-channel" , "guid" )
2561
+ . header ( "Content-Type" , "application/json; some-random-text; hello" )
2562
+ . body ( body)
2563
+ . send ( )
2564
+ . await
2565
+ . unwrap ( ) ;
2566
+
2567
+ assert_eq ! ( 200 , res. status( ) . as_u16( ) ) ;
2568
+
2569
+ let _parsed: HecAckStatusResponse = res. json ( ) . await . unwrap ( ) ;
2570
+ }
2571
+
2488
2572
#[ tokio:: test]
2489
2573
async fn event_service_acknowledgements_enabled_channel_required ( ) {
2490
2574
let message = r#"{"event":"first", "color": "blue"}"# ;
0 commit comments