@@ -36,6 +36,8 @@ use vector_lib::{configurable::configurable_component, tls::MaybeTlsIncomingStre
36
36
use vrl:: path:: OwnedTargetPath ;
37
37
use vrl:: value:: { kind:: Collection , Kind } ;
38
38
use warp:: { filters:: BoxedFilter , path, reject:: Rejection , reply:: Response , Filter , Reply } ;
39
+ use warp:: http:: header:: { HeaderValue , CONTENT_TYPE } ;
40
+ use warp:: reject;
39
41
40
42
use self :: {
41
43
acknowledgements:: {
@@ -529,26 +531,42 @@ impl SplunkSource {
529
531
. boxed ( )
530
532
}
531
533
534
+ fn lenient_json_content_type_check (
535
+ ) -> impl Filter < Extract = ( ) , Error = Rejection > + Clone + Send + Sync + ' static {
536
+ warp:: header:: header :: < HeaderValue > ( CONTENT_TYPE . as_str ( ) )
537
+ . and_then ( |value : HeaderValue | async move {
538
+ match value. to_str ( ) {
539
+ Ok ( h) if h. to_lowercase ( ) . contains ( "application/json" ) => Ok ( ( ) ) ,
540
+ _ => Err ( warp:: Rejection :: from ( ApiError :: UnsupportedEncoding ) ) ,
541
+ }
542
+ } )
543
+ . untuple_one ( )
544
+ }
545
+
532
546
fn ack_service ( & self ) -> BoxedFilter < ( Response , ) > {
533
547
let idx_ack = self . idx_ack . clone ( ) ;
548
+
534
549
warp:: post ( )
535
- . and ( path ! ( "ack" ) )
550
+ . and ( warp :: path!( "ack" ) )
536
551
. and ( self . authorization ( ) )
537
552
. and ( SplunkSource :: required_channel ( ) )
538
- . and ( warp:: body:: json ( ) )
539
- . and_then ( move |_, channel_id : String , body : HecAckStatusRequest | {
553
+ . and ( Self :: lenient_json_content_type_check ( ) )
554
+ . and ( warp:: body:: bytes ( ) )
555
+ . and_then ( move |_, channel : String , body_bytes : Bytes | {
540
556
let idx_ack = idx_ack. clone ( ) ;
541
557
async move {
558
+ // parse JSON, unit-variant BadRequest
559
+ let req: HecAckStatusRequest =
560
+ serde_json:: from_slice ( & body_bytes)
561
+ . map_err ( |_| reject:: custom ( ApiError :: BadRequest ) ) ?;
562
+
542
563
if let Some ( idx_ack) = idx_ack {
543
- let ack_statuses = idx_ack
544
- . get_acks_status_from_channel ( channel_id , & body . acks )
564
+ let acks = idx_ack
565
+ . get_acks_status_from_channel ( channel , & req . acks )
545
566
. await ?;
546
- Ok (
547
- warp:: reply:: json ( & HecAckStatusResponse { acks : ack_statuses } )
548
- . into_response ( ) ,
549
- )
567
+ Ok ( warp:: reply:: json ( & HecAckStatusResponse { acks } ) . into_response ( ) )
550
568
} else {
551
- Err ( Rejection :: from ( ApiError :: AckIsDisabled ) )
569
+ Err ( reject :: custom ( ApiError :: AckIsDisabled ) )
552
570
}
553
571
}
554
572
} )
0 commit comments