Skip to content

Commit ced219e

Browse files
author
Andrey Koshchiy
authored
enhancement(compression): zstd compression support (vectordotdev#17371)
1 parent f523f70 commit ced219e

29 files changed

+455
-121
lines changed

src/sinks/aws_s3/integration_tests.rs

+52
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,51 @@ async fn s3_gzip() {
252252
assert_eq!(lines, response_lines);
253253
}
254254

255+
#[tokio::test]
256+
async fn s3_zstd() {
257+
// Here, we're creating a bunch of events, approximately 3000, while setting our batch size
258+
// to 1000, and using zstd compression. We test to ensure that all of the keys we end up
259+
// writing represent the sum total of the lines: we expect 3 batches, each of which should
260+
// have 1000 lines.
261+
let cx = SinkContext::new_test();
262+
263+
let bucket = uuid::Uuid::new_v4().to_string();
264+
265+
create_bucket(&bucket, false).await;
266+
267+
let batch_size = 1_000;
268+
let batch_multiplier = 3;
269+
let config = S3SinkConfig {
270+
compression: Compression::zstd_default(),
271+
filename_time_format: "%s%f".into(),
272+
..config(&bucket, batch_size)
273+
};
274+
275+
let prefix = config.key_prefix.clone();
276+
let service = config.create_service(&cx.globals.proxy).await.unwrap();
277+
let sink = config.build_processor(service).unwrap();
278+
279+
let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier);
280+
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
281+
assert_eq!(receiver.await, BatchStatus::Delivered);
282+
283+
let keys = get_keys(&bucket, prefix).await;
284+
assert_eq!(keys.len(), batch_multiplier);
285+
286+
let mut response_lines: Vec<String> = Vec::new();
287+
let mut key_stream = stream::iter(keys);
288+
while let Some(key) = key_stream.next().await {
289+
assert!(key.ends_with(".log.zst"));
290+
291+
let obj = get_object(&bucket, key).await;
292+
assert_eq!(obj.content_encoding, Some("zstd".to_string()));
293+
294+
response_lines.append(&mut get_zstd_lines(obj).await);
295+
}
296+
297+
assert_eq!(lines, response_lines);
298+
}
299+
255300
// NOTE: this test doesn't actually validate anything because localstack
256301
// doesn't enforce the required Content-MD5 header on the request for
257302
// buckets with object lock enabled
@@ -481,6 +526,13 @@ async fn get_gzipped_lines(obj: GetObjectOutput) -> Vec<String> {
481526
buf_read.lines().map(|l| l.unwrap()).collect()
482527
}
483528

529+
async fn get_zstd_lines(obj: GetObjectOutput) -> Vec<String> {
530+
let body = get_object_output_body(obj).await;
531+
let decoder = zstd::Decoder::new(body).expect("zstd decoder initialization failed");
532+
let buf_read = BufReader::new(decoder);
533+
buf_read.lines().map(|l| l.unwrap()).collect()
534+
}
535+
484536
async fn get_object_output_body(obj: GetObjectOutput) -> impl std::io::Read {
485537
obj.body.collect().await.unwrap().reader()
486538
}

src/sinks/azure_blob/request_builder.rs

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ impl Compression {
106106
Self::None => "text/plain",
107107
Self::Gzip(_) => "application/gzip",
108108
Self::Zlib(_) => "application/zlib",
109+
Self::Zstd(_) => "application/zstd",
109110
}
110111
}
111112
}

src/sinks/http.rs

+72-33
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::io::Write;
22

33
use bytes::{BufMut, Bytes, BytesMut};
44
use codecs::encoding::{CharacterDelimitedEncoder, Framer, Serializer};
5-
use flate2::write::{GzEncoder, ZlibEncoder};
65
use futures::{future, FutureExt, SinkExt};
76
use http::{
87
header::{HeaderName, HeaderValue, AUTHORIZATION},
@@ -23,7 +22,7 @@ use crate::{
2322
sinks::util::{
2423
self,
2524
http::{BatchedHttpSink, HttpEventEncoder, RequestConfig},
26-
BatchConfig, Buffer, Compression, RealtimeSizeBasedDefaultBatchSettings,
25+
BatchConfig, Buffer, Compression, Compressor, RealtimeSizeBasedDefaultBatchSettings,
2726
TowerRequestConfig, UriSerde,
2827
},
2928
tls::{TlsConfig, TlsSettings},
@@ -375,24 +374,21 @@ impl util::http::HttpSink for HttpSink {
375374
builder = builder.header("Content-Type", content_type);
376375
}
377376

378-
match self.compression {
379-
Compression::Gzip(level) => {
380-
builder = builder.header("Content-Encoding", "gzip");
381-
382-
let buffer = BytesMut::new();
383-
let mut w = GzEncoder::new(buffer.writer(), level.as_flate2());
384-
w.write_all(&body).expect("Writing to Vec can't fail");
385-
body = w.finish().expect("Writing to Vec can't fail").into_inner();
386-
}
387-
Compression::Zlib(level) => {
388-
builder = builder.header("Content-Encoding", "deflate");
389-
390-
let buffer = BytesMut::new();
391-
let mut w = ZlibEncoder::new(buffer.writer(), level.as_flate2());
392-
w.write_all(&body).expect("Writing to Vec can't fail");
393-
body = w.finish().expect("Writing to Vec can't fail").into_inner();
394-
}
395-
Compression::None => {}
377+
let compression = self.compression;
378+
379+
if compression.is_compressed() {
380+
builder = builder.header(
381+
"Content-Encoding",
382+
compression
383+
.content_encoding()
384+
.expect("Encoding should be specified."),
385+
);
386+
387+
let mut compressor = Compressor::from(compression);
388+
compressor
389+
.write_all(&body)
390+
.expect("Writing to Vec can't fail.");
391+
body = compressor.finish().expect("Writing to Vec can't fail.");
396392
}
397393

398394
let headers = builder
@@ -477,12 +473,12 @@ mod tests {
477473
encoding::FramingConfig, JsonSerializerConfig, NewlineDelimitedEncoderConfig,
478474
TextSerializerConfig,
479475
};
480-
use flate2::read::MultiGzDecoder;
476+
use flate2::{read::MultiGzDecoder, read::ZlibDecoder};
481477
use futures::{channel::mpsc, stream, StreamExt};
482478
use headers::{Authorization, HeaderMapExt};
483479
use http::request::Parts;
484480
use hyper::{Method, Response, StatusCode};
485-
use serde::Deserialize;
481+
use serde::{de, Deserialize};
486482
use vector_core::event::{BatchNotifier, BatchStatus, LogEvent};
487483

488484
use super::*;
@@ -812,15 +808,44 @@ mod tests {
812808
}
813809

814810
#[tokio::test]
815-
async fn json_compression() {
811+
async fn json_gzip_compression() {
812+
json_compression("gzip").await;
813+
}
814+
815+
#[tokio::test]
816+
async fn json_zstd_compression() {
817+
json_compression("zstd").await;
818+
}
819+
820+
#[tokio::test]
821+
async fn json_zlib_compression() {
822+
json_compression("zlib").await;
823+
}
824+
825+
#[tokio::test]
826+
async fn json_gzip_compression_with_payload_wrapper() {
827+
json_compression_with_payload_wrapper("gzip").await;
828+
}
829+
830+
#[tokio::test]
831+
async fn json_zlib_compression_with_payload_wrapper() {
832+
json_compression_with_payload_wrapper("zlib").await;
833+
}
834+
835+
#[tokio::test]
836+
async fn json_zstd_compression_with_payload_wrapper() {
837+
json_compression_with_payload_wrapper("zstd").await;
838+
}
839+
840+
async fn json_compression(compression: &str) {
816841
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
817842
let num_lines = 1000;
818843

819844
let in_addr = next_addr();
820845

821846
let config = r#"
822847
uri = "http://$IN_ADDR/frames"
823-
compression = "gzip"
848+
compression = "$COMPRESSION"
824849
encoding.codec = "json"
825850
method = "post"
826851
@@ -829,7 +854,9 @@ mod tests {
829854
user = "waldo"
830855
password = "hunter2"
831856
"#
832-
.replace("$IN_ADDR", &in_addr.to_string());
857+
.replace("$IN_ADDR", &in_addr.to_string())
858+
.replace("$COMPRESSION", compression);
859+
833860
let config: HttpSinkConfig = toml::from_str(&config).unwrap();
834861

835862
let cx = SinkContext::new_test();
@@ -856,8 +883,7 @@ mod tests {
856883
Some(Authorization::basic("waldo", "hunter2")),
857884
parts.headers.typed_get()
858885
);
859-
let lines: Vec<serde_json::Value> =
860-
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
886+
let lines: Vec<serde_json::Value> = parse_compressed_json(compression, body);
861887
stream::iter(lines)
862888
})
863889
.map(|line| line.get("message").unwrap().as_str().unwrap().to_owned())
@@ -870,16 +896,15 @@ mod tests {
870896
.await;
871897
}
872898

873-
#[tokio::test]
874-
async fn json_compression_with_payload_wrapper() {
899+
async fn json_compression_with_payload_wrapper(compression: &str) {
875900
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
876901
let num_lines = 1000;
877902

878903
let in_addr = next_addr();
879904

880905
let config = r#"
881906
uri = "http://$IN_ADDR/frames"
882-
compression = "gzip"
907+
compression = "$COMPRESSION"
883908
encoding.codec = "json"
884909
payload_prefix = '{"data":'
885910
payload_suffix = "}"
@@ -890,7 +915,9 @@ mod tests {
890915
user = "waldo"
891916
password = "hunter2"
892917
"#
893-
.replace("$IN_ADDR", &in_addr.to_string());
918+
.replace("$IN_ADDR", &in_addr.to_string())
919+
.replace("$COMPRESSION", compression);
920+
894921
let config: HttpSinkConfig = toml::from_str(&config).unwrap();
895922

896923
let cx = SinkContext::new_test();
@@ -918,8 +945,8 @@ mod tests {
918945
parts.headers.typed_get()
919946
);
920947

921-
let message: serde_json::Value =
922-
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
948+
let message: serde_json::Value = parse_compressed_json(compression, body);
949+
923950
let lines: Vec<serde_json::Value> =
924951
message["data"].as_array().unwrap().to_vec();
925952
stream::iter(lines)
@@ -934,6 +961,18 @@ mod tests {
934961
.await;
935962
}
936963

964+
fn parse_compressed_json<T>(compression: &str, buf: Bytes) -> T
965+
where
966+
T: de::DeserializeOwned,
967+
{
968+
match compression {
969+
"gzip" => serde_json::from_reader(MultiGzDecoder::new(buf.reader())).unwrap(),
970+
"zstd" => serde_json::from_reader(zstd::Decoder::new(buf.reader()).unwrap()).unwrap(),
971+
"zlib" => serde_json::from_reader(ZlibDecoder::new(buf.reader())).unwrap(),
972+
_ => panic!("undefined compression: {}", compression),
973+
}
974+
}
975+
937976
async fn get_received(
938977
rx: mpsc::Receiver<(Parts, Bytes)>,
939978
assert_parts: impl Fn(Parts),

0 commit comments

Comments
 (0)