Skip to content

Commit 68ff83c

Browse files
committed
Refactor a bit
1 parent 4aa3eca commit 68ff83c

File tree

11 files changed

+260
-359
lines changed

11 files changed

+260
-359
lines changed

src/sinks/http/config.rs

+38-15
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,27 @@ use indexmap::IndexMap;
1010

1111
use crate::{
1212
codecs::{EncodingConfigWithFraming, SinkType},
13-
http::{get_http_scheme_from_uri, Auth, HttpClient, MaybeAuth},
13+
http::{Auth, HttpClient, MaybeAuth},
1414
sinks::{
1515
prelude::*,
1616
util::{
17-
http::RequestConfig,
18-
http_service::{HttpRetryLogic, HttpService},
17+
http::{HttpStatusRetryLogic, RequestConfig},
1918
RealtimeSizeBasedDefaultBatchSettings, UriSerde,
2019
},
2120
},
2221
};
2322

2423
use super::{
25-
encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
24+
encoder::HttpEncoder,
25+
request_builder::HttpRequestBuilder,
26+
service::{HttpResponse, HttpService, HttpSinkRequestBuilder},
2627
sink::HttpSink,
2728
};
2829

30+
const CONTENT_TYPE_TEXT: &str = "text/plain";
31+
const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
32+
const CONTENT_TYPE_JSON: &str = "application/json";
33+
2934
/// Configuration for the `http` sink.
3035
#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
3136
#[derive(Clone, Debug)]
@@ -238,10 +243,6 @@ impl SinkConfig for HttpSinkConfig {
238243
let (payload_prefix, payload_suffix) =
239244
validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;
240245

241-
let endpoint = self.uri.with_default_parts();
242-
243-
let protocol = get_http_scheme_from_uri(&endpoint.uri);
244-
245246
let client = self.build_http_client(&cx)?;
246247

247248
let healthcheck = match cx.healthcheck.uri {
@@ -251,27 +252,49 @@ impl SinkConfig for HttpSinkConfig {
251252
None => future::ok(()).boxed(),
252253
};
253254

255+
let content_type = {
256+
use Framer::*;
257+
use Serializer::*;
258+
match (encoder.serializer(), encoder.framer()) {
259+
(RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
260+
(Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
261+
(Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
262+
Some(CONTENT_TYPE_JSON.to_owned())
263+
}
264+
_ => None,
265+
}
266+
};
267+
254268
let request_builder = HttpRequestBuilder {
255-
encoder: HttpEncoder::new(encoder.clone(), transformer),
269+
encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
270+
compression: self.compression,
256271
};
257272

273+
let content_encoding = self.compression.is_compressed().then(|| {
274+
self.compression
275+
.content_encoding()
276+
.expect("Encoding should be specified for compression.")
277+
.to_string()
278+
});
279+
258280
let http_service_request_builder = HttpSinkRequestBuilder {
259281
uri: self.uri.with_default_parts(),
260282
method: self.method,
261283
auth: self.auth.choose_one(&self.uri.auth)?,
262284
headers,
263-
payload_prefix,
264-
payload_suffix,
265-
compression: self.compression,
266-
encoder,
285+
content_type,
286+
content_encoding,
267287
};
268288

269-
let service = HttpService::new(http_service_request_builder, client, protocol.to_string());
289+
let service = HttpService::new(client, http_service_request_builder);
270290

271291
let request_limits = self.request.tower.unwrap_with(&Default::default());
272292

293+
let retry_logic =
294+
HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status());
295+
273296
let service = ServiceBuilder::new()
274-
.settings(request_limits, HttpRetryLogic)
297+
.settings(request_limits, retry_logic)
275298
.service(service);
276299

277300
let sink = HttpSink::new(service, batch_settings, request_builder);

src/sinks/http/encoder.rs

+45-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,15 @@ use crate::{
44
event::Event,
55
sinks::util::encoding::{write_all, Encoder as SinkEncoder},
66
};
7-
use bytes::BytesMut;
8-
use codecs::encoding::Framer;
7+
use bytes::{BufMut, BytesMut};
8+
use codecs::{
9+
encoding::{
10+
Framer,
11+
Framer::{CharacterDelimited, NewlineDelimited},
12+
Serializer::Json,
13+
},
14+
CharacterDelimitedEncoder,
15+
};
916
use std::io;
1017
use tokio_util::codec::Encoder as _;
1118

@@ -15,14 +22,23 @@ use crate::sinks::prelude::*;
1522
pub(super) struct HttpEncoder {
1623
pub(super) encoder: Encoder<Framer>,
1724
pub(super) transformer: Transformer,
25+
pub(super) payload_prefix: String,
26+
pub(super) payload_suffix: String,
1827
}
1928

2029
impl HttpEncoder {
2130
/// Creates a new `HttpEncoder`.
22-
pub(super) const fn new(encoder: Encoder<Framer>, transformer: Transformer) -> Self {
31+
pub(super) const fn new(
32+
encoder: Encoder<Framer>,
33+
transformer: Transformer,
34+
payload_prefix: String,
35+
payload_suffix: String,
36+
) -> Self {
2337
Self {
2438
encoder,
2539
transformer,
40+
payload_prefix,
41+
payload_suffix,
2642
}
2743
}
2844
}
@@ -47,6 +63,32 @@ impl SinkEncoder<Vec<Event>> for HttpEncoder {
4763
.map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode event"))?;
4864
}
4965

66+
match (self.encoder.serializer(), self.encoder.framer()) {
67+
(Json(_), NewlineDelimited(_)) => {
68+
if !body.is_empty() {
69+
// Remove trailing newline for backwards-compatibility
70+
// with Vector `0.20.x`.
71+
body.truncate(body.len() - 1);
72+
}
73+
}
74+
(Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
75+
// TODO(https://github.com/vectordotdev/vector/issues/11253):
76+
// Prepend before building a request body to eliminate the
77+
// additional copy here.
78+
let message = body.split();
79+
body.put(self.payload_prefix.as_bytes());
80+
body.put_u8(b'[');
81+
if !message.is_empty() {
82+
body.unsplit(message);
83+
// remove trailing comma from last record
84+
body.truncate(body.len() - 1);
85+
}
86+
body.put_u8(b']');
87+
body.put(self.payload_suffix.as_bytes());
88+
}
89+
_ => {}
90+
}
91+
5092
let body = body.freeze();
5193

5294
write_all(writer, 1, body.as_ref()).map(|()| (body.len(), byte_size))

src/sinks/http/request_builder.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
use bytes::Bytes;
44
use std::io;
55

6-
use crate::sinks::{prelude::*, util::http_service::HttpRequest};
6+
use crate::sinks::prelude::*;
77

8-
use super::encoder::HttpEncoder;
8+
use super::{encoder::HttpEncoder, service::HttpRequest};
99

1010
pub(super) struct HttpRequestBuilder {
1111
pub(super) encoder: HttpEncoder,
12+
pub(super) compression: Compression,
1213
}
1314

1415
impl RequestBuilder<Vec<Event>> for HttpRequestBuilder {
@@ -20,8 +21,9 @@ impl RequestBuilder<Vec<Event>> for HttpRequestBuilder {
2021
type Error = io::Error;
2122

2223
fn compression(&self) -> Compression {
24+
self.compression
2325
// Compression is handled in the Service
24-
Compression::None
26+
//Compression::None
2527
}
2628

2729
fn encoder(&self) -> &Self::Encoder {

0 commit comments

Comments
 (0)