Skip to content

Commit d194992

Browse files
authored
fix(fluent source): fix ack message format (vectordotdev#17407)
1 parent c7d7cf8 commit d194992

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

src/sources/fluent/mod.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::io::{self, Read};
23
use std::net::SocketAddr;
34
use std::time::Duration;
@@ -9,8 +10,8 @@ use codecs::{BytesDeserializerConfig, StreamDecodingError};
910
use flate2::read::MultiGzDecoder;
1011
use lookup::lookup_v2::parse_value_path;
1112
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
12-
use rmp_serde::{decode, Deserializer};
13-
use serde::Deserialize;
13+
use rmp_serde::{decode, Deserializer, Serializer};
14+
use serde::{Deserialize, Serialize};
1415
use smallvec::{smallvec, SmallVec};
1516
use tokio_util::codec::Decoder;
1617
use vector_config::configurable_component;
@@ -532,15 +533,18 @@ impl TcpSourceAcker for FluentAcker {
532533
return None;
533534
}
534535

535-
let mut acks = String::new();
536+
let mut buf = Vec::new();
537+
let mut ser = Serializer::new(&mut buf);
538+
let mut ack_map = HashMap::new();
539+
536540
for chunk in self.chunks {
537-
let ack = match ack {
538-
TcpSourceAck::Ack => format!(r#"{{"ack": "{}"}}"#, chunk),
539-
_ => String::from("{}"),
541+
ack_map.clear();
542+
if let TcpSourceAck::Ack = ack {
543+
ack_map.insert("ack", chunk);
540544
};
541-
acks.push_str(&ack);
545+
ack_map.serialize(&mut ser).unwrap();
542546
}
543-
Some(acks.into())
547+
Some(buf.into())
544548
}
545549
}
546550

@@ -861,7 +865,8 @@ mod tests {
861865
async fn ack_delivered_with_chunk() {
862866
let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
863867
assert_eq!(result.unwrap().unwrap(), output.len());
864-
assert!(output.starts_with(b"{\"ack\":"));
868+
let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ...
869+
assert_eq!(output[..expected.len()], expected);
865870
}
866871

867872
#[tokio::test]
@@ -875,7 +880,8 @@ mod tests {
875880
async fn ack_failed_with_chunk() {
876881
let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
877882
assert_eq!(result.unwrap().unwrap(), output.len());
878-
assert_eq!(output, &b"{}"[..]);
883+
let expected: Vec<u8> = vec![0x80]; // { }
884+
assert_eq!(output, expected);
879885
}
880886

881887
async fn check_acknowledgements(

0 commit comments

Comments
 (0)