Skip to content

Commit 616930c

Browse files
Emory OtottEmory Otott
Emory Otott
authored and
Emory Otott
committed
#23022 Allow content-type header if it only includes application/json. Do not require exact match
1 parent 91245a4 commit 616930c

File tree

1 file changed

+28
-10
lines changed

1 file changed

+28
-10
lines changed

src/sources/splunk_hec/mod.rs

+28-10
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use vector_lib::{configurable::configurable_component, tls::MaybeTlsIncomingStre
3636
use vrl::path::OwnedTargetPath;
3737
use vrl::value::{kind::Collection, Kind};
3838
use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply};
39+
use warp::http::header::{HeaderValue, CONTENT_TYPE};
40+
use warp::reject;
3941

4042
use self::{
4143
acknowledgements::{
@@ -529,26 +531,42 @@ impl SplunkSource {
529531
.boxed()
530532
}
531533

534+
fn lenient_json_content_type_check(
535+
) -> impl Filter<Extract = (), Error = Rejection> + Clone + Send + Sync + 'static {
536+
warp::header::header::<HeaderValue>(CONTENT_TYPE.as_str())
537+
.and_then(|value: HeaderValue| async move {
538+
match value.to_str() {
539+
Ok(h) if h.to_lowercase().contains("application/json") => Ok(()),
540+
_ => Err(warp::Rejection::from(ApiError::UnsupportedEncoding)),
541+
}
542+
})
543+
.untuple_one()
544+
}
545+
532546
fn ack_service(&self) -> BoxedFilter<(Response,)> {
533547
let idx_ack = self.idx_ack.clone();
548+
534549
warp::post()
535-
.and(path!("ack"))
550+
.and(warp::path!("ack"))
536551
.and(self.authorization())
537552
.and(SplunkSource::required_channel())
538-
.and(warp::body::json())
539-
.and_then(move |_, channel_id: String, body: HecAckStatusRequest| {
553+
.and(Self::lenient_json_content_type_check())
554+
.and(warp::body::bytes())
555+
.and_then(move |_, channel: String, body_bytes: Bytes| {
540556
let idx_ack = idx_ack.clone();
541557
async move {
558+
// parse JSON, unit-variant BadRequest
559+
let req: HecAckStatusRequest =
560+
serde_json::from_slice(&body_bytes)
561+
.map_err(|_| reject::custom(ApiError::BadRequest))?;
562+
542563
if let Some(idx_ack) = idx_ack {
543-
let ack_statuses = idx_ack
544-
.get_acks_status_from_channel(channel_id, &body.acks)
564+
let acks = idx_ack
565+
.get_acks_status_from_channel(channel, &req.acks)
545566
.await?;
546-
Ok(
547-
warp::reply::json(&HecAckStatusResponse { acks: ack_statuses })
548-
.into_response(),
549-
)
567+
Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
550568
} else {
551-
Err(Rejection::from(ApiError::AckIsDisabled))
569+
Err(reject::custom(ApiError::AckIsDisabled))
552570
}
553571
}
554572
})

0 commit comments

Comments
 (0)