Skip to content

Commit 4aa3eca

Browse files
committed
Try item sized batching
1 parent 44c99b3 commit 4aa3eca

File tree

5 files changed

+30
-4
lines changed

5 files changed

+30
-4
lines changed

src/sinks/http/batch.rs

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use codecs::encoding::Framer;
2+
use vector_core::{
3+
event::Event, stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf,
4+
};
5+
6+
use crate::codecs::Encoder;
7+
8+
#[derive(Default)]
9+
pub(super) struct HttpBatchSizer {
10+
pub(super) encoder: Encoder<Framer>,
11+
}
12+
13+
impl ItemBatchSize<Event> for HttpBatchSizer {
14+
fn size(&self, item: &Event) -> usize {
15+
match self.encoder.serializer() {
16+
codecs::encoding::Serializer::Json(_) => item.estimated_json_encoded_size_of().get(),
17+
_ => item.size_of(),
18+
}
19+
}
20+
}

src/sinks/http/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ pub(super) fn validate_payload_wrapper(
225225
#[typetag::serde(name = "http")]
226226
impl SinkConfig for HttpSinkConfig {
227227
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
228-
let batch_settings = self.batch.into_batcher_settings()?;
228+
let batch_settings = self.batch.validate()?.into_batcher_settings()?;
229229

230230
let encoder = self.build_encoder()?;
231231
let transformer = self.encoding.transformer();

src/sinks/http/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for
44
//! taking a stream of [`vector_core::event::Event`]s and forwarding them to an HTTP server.
55
6+
mod batch;
67
mod config;
78
mod encoder;
89
mod request_builder;

src/sinks/http/sink.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use crate::sinks::{
55
util::http_service::{HttpRetryLogic, HttpService},
66
};
77

8-
use super::{request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder};
8+
use super::{
9+
batch::HttpBatchSizer, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
10+
};
911

1012
pub(super) struct HttpSink {
1113
service: Svc<HttpService<HttpSinkRequestBuilder>, HttpRetryLogic>,
@@ -29,8 +31,10 @@ impl HttpSink {
2931

3032
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
3133
input
32-
// Batch the input stream with byte size calculation.
33-
.batched(self.batch_settings.into_byte_size_config())
34+
// Batch the input stream with size calculation dependent on the configured codec
35+
.batched(self.batch_settings.into_item_size_config(HttpBatchSizer {
36+
encoder: self.request_builder.encoder.encoder.clone(),
37+
}))
3438
// Build requests with no concurrency limit.
3539
.request_builder(None, self.request_builder)
3640
// Filter out any errors that occurred in the request building.

src/sinks/util/http_service/service.rs

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ where
7979
fn call(&mut self, request: HttpRequest) -> Self::Future {
8080
// for internal metrics reporting
8181
let raw_byte_size = request.payload.len();
82+
8283
let events_byte_size = request
8384
.request_metadata
8485
.into_events_estimated_json_encoded_byte_size();

0 commit comments

Comments
 (0)