Skip to content

Commit bf7d796

Browse files
authored
feat(codecs): Add lossy option to JSON deserializer (vectordotdev#17628)
Closes: vectordotdev#16406. Adds a `decoding.json.lossy` option. <!-- **Your PR title must conform to the conventional commit spec!** <type>(<scope>)!: <description> * `type` = chore, enhancement, feat, fix, docs * `!` = OPTIONAL: signals a breaking change * `scope` = Optional when `type` is "chore" or "docs", available scopes https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20 * `description` = short description of the change Examples: * enhancement(file source): Add `sort` option to sort discovered files * feat(new source): Initial `statsd` source * fix(file source): Fix a bug discovering new files * chore(external docs): Clarify `batch_size` option -->
1 parent 45a28f8 commit bf7d796

File tree

28 files changed

+1131
-686
lines changed

28 files changed

+1131
-686
lines changed

lib/codecs/src/decoding/format/json.rs

+92-15
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ use std::convert::TryInto;
22

33
use bytes::Bytes;
44
use chrono::Utc;
5+
use derivative::Derivative;
56
use lookup::PathPrefix;
67
use serde::{Deserialize, Serialize};
78
use smallvec::{smallvec, SmallVec};
9+
use vector_config::configurable_component;
810
use vector_core::{
911
config::{log_schema, DataType, LogNamespace},
1012
event::Event,
@@ -16,7 +18,36 @@ use super::Deserializer;
1618

1719
/// Config used to build a `JsonDeserializer`.
1820
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
19-
pub struct JsonDeserializerConfig;
21+
pub struct JsonDeserializerConfig {
22+
#[serde(
23+
default,
24+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
25+
)]
26+
/// Options for the JSON deserializer.
27+
pub json: JsonDeserializerOptions,
28+
}
29+
30+
/// JSON-specific decoding options.
31+
#[configurable_component]
32+
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
33+
#[derivative(Default)]
34+
pub struct JsonDeserializerOptions {
35+
/// Determines whether or not to replace invalid UTF-8 sequences instead of returning an error.
36+
///
37+
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
38+
///
39+
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
40+
#[serde(
41+
default = "default_lossy",
42+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
43+
)]
44+
#[derivative(Default(value = "default_lossy()"))]
45+
lossy: bool,
46+
}
47+
48+
const fn default_lossy() -> bool {
49+
true
50+
}
2051

2152
impl JsonDeserializerConfig {
2253
/// Build the `JsonDeserializer` from this configuration.
@@ -56,19 +87,23 @@ impl JsonDeserializerConfig {
5687

5788
impl JsonDeserializerConfig {
5889
/// Creates a new `JsonDeserializerConfig`.
59-
pub fn new() -> Self {
60-
Default::default()
90+
pub fn new(options: JsonDeserializerOptions) -> Self {
91+
Self { json: options }
6192
}
6293
}
6394

6495
/// Deserializer that builds `Event`s from a byte frame containing JSON.
65-
#[derive(Debug, Clone, Default)]
66-
pub struct JsonDeserializer;
96+
#[derive(Debug, Clone, Derivative)]
97+
#[derivative(Default)]
98+
pub struct JsonDeserializer {
99+
#[derivative(Default(value = "default_lossy()"))]
100+
lossy: bool,
101+
}
67102

68103
impl JsonDeserializer {
69104
/// Creates a new `JsonDeserializer`.
70-
pub fn new() -> Self {
71-
Default::default()
105+
pub fn new(lossy: bool) -> Self {
106+
Self { lossy }
72107
}
73108
}
74109

@@ -84,8 +119,11 @@ impl Deserializer for JsonDeserializer {
84119
return Ok(smallvec![]);
85120
}
86121

87-
let json: serde_json::Value = serde_json::from_slice(&bytes)
88-
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
122+
let json: serde_json::Value = match self.lossy {
123+
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
124+
false => serde_json::from_slice(&bytes),
125+
}
126+
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
89127

90128
// If the root is an Array, split it into multiple events
91129
let mut events = match json {
@@ -119,8 +157,10 @@ impl Deserializer for JsonDeserializer {
119157
}
120158

121159
impl From<&JsonDeserializerConfig> for JsonDeserializer {
122-
fn from(_: &JsonDeserializerConfig) -> Self {
123-
Self
160+
fn from(config: &JsonDeserializerConfig) -> Self {
161+
Self {
162+
lossy: config.json.lossy,
163+
}
124164
}
125165
}
126166

@@ -133,7 +173,7 @@ mod tests {
133173
#[test]
134174
fn deserialize_json() {
135175
let input = Bytes::from(r#"{ "foo": 123 }"#);
136-
let deserializer = JsonDeserializer::new();
176+
let deserializer = JsonDeserializer::default();
137177

138178
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
139179
let events = deserializer.parse(input.clone(), namespace).unwrap();
@@ -160,7 +200,7 @@ mod tests {
160200
#[test]
161201
fn deserialize_json_array() {
162202
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
163-
let deserializer = JsonDeserializer::new();
203+
let deserializer = JsonDeserializer::default();
164204
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
165205
let events = deserializer.parse(input.clone(), namespace).unwrap();
166206
let mut events = events.into_iter();
@@ -197,7 +237,7 @@ mod tests {
197237
#[test]
198238
fn deserialize_skip_empty() {
199239
let input = Bytes::from("");
200-
let deserializer = JsonDeserializer::new();
240+
let deserializer = JsonDeserializer::default();
201241

202242
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
203243
let events = deserializer.parse(input.clone(), namespace).unwrap();
@@ -208,7 +248,44 @@ mod tests {
208248
#[test]
209249
fn deserialize_error_invalid_json() {
210250
let input = Bytes::from("{ foo");
211-
let deserializer = JsonDeserializer::new();
251+
let deserializer = JsonDeserializer::default();
252+
253+
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
254+
assert!(deserializer.parse(input.clone(), namespace).is_err());
255+
}
256+
}
257+
258+
#[test]
259+
fn deserialize_lossy_replace_invalid_utf8() {
260+
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
261+
let deserializer = JsonDeserializer::new(true);
262+
263+
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
264+
let events = deserializer.parse(input.clone(), namespace).unwrap();
265+
let mut events = events.into_iter();
266+
267+
{
268+
let event = events.next().unwrap();
269+
let log = event.as_log();
270+
assert_eq!(log["foo"], b"Hello \xEF\xBF\xBDWorld".into());
271+
assert_eq!(
272+
log.get((
273+
lookup::PathPrefix::Event,
274+
log_schema().timestamp_key().unwrap()
275+
))
276+
.is_some(),
277+
namespace == LogNamespace::Legacy
278+
);
279+
}
280+
281+
assert_eq!(events.next(), None);
282+
}
283+
}
284+
285+
#[test]
286+
fn deserialize_non_lossy_error_invalid_utf8() {
287+
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
288+
let deserializer = JsonDeserializer::new(false);
212289

213290
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
214291
assert!(deserializer.parse(input.clone(), namespace).is_err());

lib/codecs/src/decoding/format/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mod syslog;
1414
use ::bytes::Bytes;
1515
use dyn_clone::DynClone;
1616
pub use gelf::{GelfDeserializer, GelfDeserializerConfig};
17-
pub use json::{JsonDeserializer, JsonDeserializerConfig};
17+
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
1818
pub use native::{NativeDeserializer, NativeDeserializerConfig};
1919
pub use native_json::{NativeJsonDeserializer, NativeJsonDeserializerConfig};
2020
use smallvec::SmallVec;

lib/codecs/src/decoding/mod.rs

+26-12
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use bytes::{Bytes, BytesMut};
99
pub use error::StreamDecodingError;
1010
pub use format::{
1111
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
12-
GelfDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, NativeDeserializer,
13-
NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig,
12+
GelfDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
13+
NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
14+
NativeJsonDeserializerConfig,
1415
};
1516
#[cfg(feature = "syslog")]
1617
pub use format::{SyslogDeserializer, SyslogDeserializerConfig};
@@ -243,7 +244,14 @@ pub enum DeserializerConfig {
243244
/// Decodes the raw bytes as [JSON][json].
244245
///
245246
/// [json]: https://www.json.org/
246-
Json,
247+
Json {
248+
/// Options for the JSON deserializer.
249+
#[serde(
250+
default,
251+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
252+
)]
253+
json: JsonDeserializerOptions,
254+
},
247255

248256
#[cfg(feature = "syslog")]
249257
/// Decodes the raw bytes as a Syslog message.
@@ -284,8 +292,8 @@ impl From<BytesDeserializerConfig> for DeserializerConfig {
284292
}
285293

286294
impl From<JsonDeserializerConfig> for DeserializerConfig {
287-
fn from(_: JsonDeserializerConfig) -> Self {
288-
Self::Json
295+
fn from(config: JsonDeserializerConfig) -> Self {
296+
Self::Json { json: config.json }
289297
}
290298
}
291299

@@ -307,7 +315,9 @@ impl DeserializerConfig {
307315
pub fn build(&self) -> Deserializer {
308316
match self {
309317
DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()),
310-
DeserializerConfig::Json => Deserializer::Json(JsonDeserializerConfig.build()),
318+
DeserializerConfig::Json { json } => {
319+
Deserializer::Json(JsonDeserializerConfig::new(json.clone()).build())
320+
}
311321
#[cfg(feature = "syslog")]
312322
DeserializerConfig::Syslog => {
313323
Deserializer::Syslog(SyslogDeserializerConfig::default().build())
@@ -325,7 +335,7 @@ impl DeserializerConfig {
325335
match self {
326336
DeserializerConfig::Native => FramingConfig::LengthDelimited,
327337
DeserializerConfig::Bytes
328-
| DeserializerConfig::Json
338+
| DeserializerConfig::Json { .. }
329339
| DeserializerConfig::Gelf
330340
| DeserializerConfig::NativeJson => FramingConfig::NewlineDelimited {
331341
newline_delimited: Default::default(),
@@ -341,7 +351,9 @@ impl DeserializerConfig {
341351
pub fn output_type(&self) -> DataType {
342352
match self {
343353
DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
344-
DeserializerConfig::Json => JsonDeserializerConfig.output_type(),
354+
DeserializerConfig::Json { json } => {
355+
JsonDeserializerConfig::new(json.clone()).output_type()
356+
}
345357
#[cfg(feature = "syslog")]
346358
DeserializerConfig::Syslog => SyslogDeserializerConfig::default().output_type(),
347359
DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
@@ -354,7 +366,9 @@ impl DeserializerConfig {
354366
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
355367
match self {
356368
DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
357-
DeserializerConfig::Json => JsonDeserializerConfig.schema_definition(log_namespace),
369+
DeserializerConfig::Json { json } => {
370+
JsonDeserializerConfig::new(json.clone()).schema_definition(log_namespace)
371+
}
358372
#[cfg(feature = "syslog")]
359373
DeserializerConfig::Syslog => {
360374
SyslogDeserializerConfig::default().schema_definition(log_namespace)
@@ -371,12 +385,12 @@ impl DeserializerConfig {
371385
pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
372386
match (&self, framer) {
373387
(
374-
DeserializerConfig::Json | DeserializerConfig::NativeJson,
388+
DeserializerConfig::Json { .. } | DeserializerConfig::NativeJson,
375389
FramingConfig::NewlineDelimited { .. },
376390
) => "application/x-ndjson",
377391
(
378392
DeserializerConfig::Gelf
379-
| DeserializerConfig::Json
393+
| DeserializerConfig::Json { .. }
380394
| DeserializerConfig::NativeJson,
381395
FramingConfig::CharacterDelimited {
382396
character_delimited:
@@ -388,7 +402,7 @@ impl DeserializerConfig {
388402
) => "application/json",
389403
(DeserializerConfig::Native, _) => "application/octet-stream",
390404
(
391-
DeserializerConfig::Json
405+
DeserializerConfig::Json { .. }
392406
| DeserializerConfig::NativeJson
393407
| DeserializerConfig::Bytes
394408
| DeserializerConfig::Gelf,

src/codecs/decoding/decoder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ mod tests {
122122
let reader = StreamReader::new(stream);
123123
let decoder = Decoder::new(
124124
Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
125-
Deserializer::Json(JsonDeserializer::new()),
125+
Deserializer::Json(JsonDeserializer::default()),
126126
);
127127
let mut stream = FramedRead::new(reader, decoder);
128128

src/components/validation/resources/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S
141141
// "bytes" can be a top-level field and we aren't implicitly decoding everything into the
142142
// `message` field... but it's close enough for now.
143143
DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
144-
DeserializerConfig::Json => SerializerConfig::Json(JsonSerializerConfig::default()),
144+
DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
145145
// TODO: We need to create an Avro serializer because, certainly, for any source decoding
146146
// the data as Avro, we can't possibly send anything else without the source just
147147
// immediately barfing.
@@ -184,7 +184,9 @@ fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Des
184184
SerializerConfig::Avro { .. } => todo!(),
185185
SerializerConfig::Csv { .. } => todo!(),
186186
SerializerConfig::Gelf => DeserializerConfig::Gelf,
187-
SerializerConfig::Json(_) => DeserializerConfig::Json,
187+
SerializerConfig::Json(_) => DeserializerConfig::Json {
188+
json: Default::default(),
189+
},
188190
SerializerConfig::Logfmt => todo!(),
189191
SerializerConfig::Native => DeserializerConfig::Native,
190192
SerializerConfig::NativeJson => DeserializerConfig::NativeJson,

src/sources/datadog_agent/tests.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -1595,7 +1595,9 @@ fn test_config_outputs() {
15951595
(
15961596
"json / single output",
15971597
TestCase {
1598-
decoding: DeserializerConfig::Json,
1598+
decoding: DeserializerConfig::Json {
1599+
json: Default::default(),
1600+
},
15991601
multiple_outputs: false,
16001602
want: HashMap::from([(
16011603
None,
@@ -1620,7 +1622,9 @@ fn test_config_outputs() {
16201622
(
16211623
"json / multiple output",
16221624
TestCase {
1623-
decoding: DeserializerConfig::Json,
1625+
decoding: DeserializerConfig::Json {
1626+
json: Default::default(),
1627+
},
16241628
multiple_outputs: true,
16251629
want: HashMap::from([
16261630
(

src/sources/http_client/client.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,9 @@ impl ValidatableComponent for HttpClientConfig {
235235
let config = Self {
236236
endpoint: uri.to_string(),
237237
interval: Duration::from_secs(1),
238-
decoding: DeserializerConfig::Json,
238+
decoding: DeserializerConfig::Json {
239+
json: Default::default(),
240+
},
239241
..Default::default()
240242
};
241243

0 commit comments

Comments
 (0)