Skip to content

Commit c6362d2

Browse files
committed
fix(fluent source): fix ack message format
Fluent ack messages were sent as plain JSON instead of msgpack format. Signed-off-by: Benoît GARNIER <[email protected]>
1 parent 6088abd commit c6362d2

File tree

1 file changed

+15
-10
lines changed

1 file changed

+15
-10
lines changed

src/sources/fluent/mod.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::io::{self, Read};
23
use std::net::SocketAddr;
34
use std::time::Duration;
@@ -9,8 +10,8 @@ use codecs::{BytesDeserializerConfig, StreamDecodingError};
910
use flate2::read::MultiGzDecoder;
1011
use lookup::lookup_v2::parse_value_path;
1112
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
12-
use rmp_serde::{decode, Deserializer};
13-
use serde::Deserialize;
13+
use rmp_serde::{decode, Deserializer, Serializer};
14+
use serde::{Deserialize, Serialize};
1415
use smallvec::{smallvec, SmallVec};
1516
use tokio_util::codec::Decoder;
1617
use vector_config::configurable_component;
@@ -532,15 +533,17 @@ impl TcpSourceAcker for FluentAcker {
532533
return None;
533534
}
534535

535-
let mut acks = String::new();
536+
let mut buf = Vec::new();
537+
let mut ser = Serializer::new(&mut buf);
538+
536539
for chunk in self.chunks {
537-
let ack = match ack {
538-
TcpSourceAck::Ack => format!(r#"{{"ack": "{}"}}"#, chunk),
539-
_ => String::from("{}"),
540+
let mut ackmap = HashMap::new();
541+
if let TcpSourceAck::Ack = ack {
542+
ackmap.insert("ack", chunk);
540543
};
541-
acks.push_str(&ack);
544+
ackmap.serialize(&mut ser).unwrap();
542545
}
543-
Some(acks.into())
546+
Some(buf.into())
544547
}
545548
}
546549

@@ -861,7 +864,8 @@ mod tests {
861864
async fn ack_delivered_with_chunk() {
862865
let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
863866
assert_eq!(result.unwrap().unwrap(), output.len());
864-
assert!(output.starts_with(b"{\"ack\":"));
867+
let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ...
868+
assert_eq!(output[..expected.len()], expected);
865869
}
866870

867871
#[tokio::test]
@@ -875,7 +879,8 @@ mod tests {
875879
async fn ack_failed_with_chunk() {
876880
let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
877881
assert_eq!(result.unwrap().unwrap(), output.len());
878-
assert_eq!(output, &b"{}"[..]);
882+
let expected: Vec<u8> = vec![0x80]; // { }
883+
assert_eq!(output, expected);
879884
}
880885

881886
async fn check_acknowledgements(

0 commit comments

Comments
 (0)