Skip to content

feat(codecs): Add lossy option to JSON deserializer #17628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 92 additions & 15 deletions lib/codecs/src/decoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::convert::TryInto;

use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use lookup::PathPrefix;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
config::{log_schema, DataType, LogNamespace},
event::Event,
Expand All @@ -16,7 +18,36 @@ use super::Deserializer;

/// Config used to build a `JsonDeserializer`.
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct JsonDeserializerConfig;
pub struct JsonDeserializerConfig {
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
/// Options for the JSON deserializer.
pub json: JsonDeserializerOptions,
}

/// JSON-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of returning an error.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

const fn default_lossy() -> bool {
true
}

impl JsonDeserializerConfig {
/// Build the `JsonDeserializer` from this configuration.
Expand Down Expand Up @@ -56,19 +87,23 @@ impl JsonDeserializerConfig {

impl JsonDeserializerConfig {
/// Creates a new `JsonDeserializerConfig`.
pub fn new() -> Self {
Default::default()
pub fn new(options: JsonDeserializerOptions) -> Self {
Self { json: options }
}
}

/// Deserializer that builds `Event`s from a byte frame containing JSON.
#[derive(Debug, Clone, Default)]
pub struct JsonDeserializer;
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

impl JsonDeserializer {
/// Creates a new `JsonDeserializer`.
pub fn new() -> Self {
Default::default()
pub fn new(lossy: bool) -> Self {
Self { lossy }
}
}

Expand All @@ -84,8 +119,11 @@ impl Deserializer for JsonDeserializer {
return Ok(smallvec![]);
}

let json: serde_json::Value = serde_json::from_slice(&bytes)
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
let json: serde_json::Value = match self.lossy {
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
false => serde_json::from_slice(&bytes),
}
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;

// If the root is an Array, split it into multiple events
let mut events = match json {
Expand Down Expand Up @@ -119,8 +157,10 @@ impl Deserializer for JsonDeserializer {
}

impl From<&JsonDeserializerConfig> for JsonDeserializer {
fn from(_: &JsonDeserializerConfig) -> Self {
Self
fn from(config: &JsonDeserializerConfig) -> Self {
Self {
lossy: config.json.lossy,
}
}
}

Expand All @@ -133,7 +173,7 @@ mod tests {
#[test]
fn deserialize_json() {
let input = Bytes::from(r#"{ "foo": 123 }"#);
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
Expand All @@ -160,7 +200,7 @@ mod tests {
#[test]
fn deserialize_json_array() {
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();
Expand Down Expand Up @@ -197,7 +237,7 @@ mod tests {
#[test]
fn deserialize_skip_empty() {
let input = Bytes::from("");
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
Expand All @@ -208,7 +248,44 @@ mod tests {
#[test]
fn deserialize_error_invalid_json() {
let input = Bytes::from("{ foo");
let deserializer = JsonDeserializer::new();
let deserializer = JsonDeserializer::default();

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
assert!(deserializer.parse(input.clone(), namespace).is_err());
}
}

#[test]
fn deserialize_lossy_replace_invalid_utf8() {
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
let deserializer = JsonDeserializer::new(true);

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();

{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["foo"], b"Hello \xEF\xBF\xBDWorld".into());
assert_eq!(
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.is_some(),
namespace == LogNamespace::Legacy
);
}

assert_eq!(events.next(), None);
}
}

#[test]
fn deserialize_non_lossy_error_invalid_utf8() {
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
let deserializer = JsonDeserializer::new(false);

for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
assert!(deserializer.parse(input.clone(), namespace).is_err());
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod syslog;
use ::bytes::Bytes;
use dyn_clone::DynClone;
pub use gelf::{GelfDeserializer, GelfDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{NativeJsonDeserializer, NativeJsonDeserializerConfig};
use smallvec::SmallVec;
Expand Down
38 changes: 26 additions & 12 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use bytes::{Bytes, BytesMut};
pub use error::StreamDecodingError;
pub use format::{
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
GelfDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, NativeDeserializer,
NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig,
GelfDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig,
};
#[cfg(feature = "syslog")]
pub use format::{SyslogDeserializer, SyslogDeserializerConfig};
Expand Down Expand Up @@ -243,7 +244,14 @@ pub enum DeserializerConfig {
/// Decodes the raw bytes as [JSON][json].
///
/// [json]: https://www.json.org/
Json,
Json {
/// Options for the JSON deserializer.
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
json: JsonDeserializerOptions,
},

#[cfg(feature = "syslog")]
/// Decodes the raw bytes as a Syslog message.
Expand Down Expand Up @@ -284,8 +292,8 @@ impl From<BytesDeserializerConfig> for DeserializerConfig {
}

impl From<JsonDeserializerConfig> for DeserializerConfig {
fn from(_: JsonDeserializerConfig) -> Self {
Self::Json
fn from(config: JsonDeserializerConfig) -> Self {
Self::Json { json: config.json }
}
}

Expand All @@ -307,7 +315,9 @@ impl DeserializerConfig {
pub fn build(&self) -> Deserializer {
match self {
DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()),
DeserializerConfig::Json => Deserializer::Json(JsonDeserializerConfig.build()),
DeserializerConfig::Json { json } => {
Deserializer::Json(JsonDeserializerConfig::new(json.clone()).build())
}
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog => {
Deserializer::Syslog(SyslogDeserializerConfig::default().build())
Expand All @@ -325,7 +335,7 @@ impl DeserializerConfig {
match self {
DeserializerConfig::Native => FramingConfig::LengthDelimited,
DeserializerConfig::Bytes
| DeserializerConfig::Json
| DeserializerConfig::Json { .. }
| DeserializerConfig::Gelf
| DeserializerConfig::NativeJson => FramingConfig::NewlineDelimited {
newline_delimited: Default::default(),
Expand All @@ -341,7 +351,9 @@ impl DeserializerConfig {
pub fn output_type(&self) -> DataType {
match self {
DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
DeserializerConfig::Json => JsonDeserializerConfig.output_type(),
DeserializerConfig::Json { json } => {
JsonDeserializerConfig::new(json.clone()).output_type()
}
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog => SyslogDeserializerConfig::default().output_type(),
DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
Expand All @@ -354,7 +366,9 @@ impl DeserializerConfig {
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match self {
DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::Json => JsonDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::Json { json } => {
JsonDeserializerConfig::new(json.clone()).schema_definition(log_namespace)
}
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog => {
SyslogDeserializerConfig::default().schema_definition(log_namespace)
Expand All @@ -371,12 +385,12 @@ impl DeserializerConfig {
pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
match (&self, framer) {
(
DeserializerConfig::Json | DeserializerConfig::NativeJson,
DeserializerConfig::Json { .. } | DeserializerConfig::NativeJson,
FramingConfig::NewlineDelimited { .. },
) => "application/x-ndjson",
(
DeserializerConfig::Gelf
| DeserializerConfig::Json
| DeserializerConfig::Json { .. }
| DeserializerConfig::NativeJson,
FramingConfig::CharacterDelimited {
character_delimited:
Expand All @@ -388,7 +402,7 @@ impl DeserializerConfig {
) => "application/json",
(DeserializerConfig::Native, _) => "application/octet-stream",
(
DeserializerConfig::Json
DeserializerConfig::Json { .. }
| DeserializerConfig::NativeJson
| DeserializerConfig::Bytes
| DeserializerConfig::Gelf,
Expand Down
2 changes: 1 addition & 1 deletion src/codecs/decoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ mod tests {
let reader = StreamReader::new(stream);
let decoder = Decoder::new(
Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
Deserializer::Json(JsonDeserializer::new()),
Deserializer::Json(JsonDeserializer::default()),
);
let mut stream = FramedRead::new(reader, decoder);

Expand Down
6 changes: 4 additions & 2 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S
// "bytes" can be a top-level field and we aren't implicitly decoding everything into the
// `message` field... but it's close enough for now.
DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
DeserializerConfig::Json => SerializerConfig::Json(JsonSerializerConfig::default()),
DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
// TODO: We need to create an Avro serializer because, certainly, for any source decoding
// the data as Avro, we can't possibly send anything else without the source just
// immediately barfing.
Expand Down Expand Up @@ -184,7 +184,9 @@ fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Des
SerializerConfig::Avro { .. } => todo!(),
SerializerConfig::Csv { .. } => todo!(),
SerializerConfig::Gelf => DeserializerConfig::Gelf,
SerializerConfig::Json(_) => DeserializerConfig::Json,
SerializerConfig::Json(_) => DeserializerConfig::Json {
json: Default::default(),
},
SerializerConfig::Logfmt => todo!(),
SerializerConfig::Native => DeserializerConfig::Native,
SerializerConfig::NativeJson => DeserializerConfig::NativeJson,
Expand Down
8 changes: 6 additions & 2 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,9 @@ fn test_config_outputs() {
(
"json / single output",
TestCase {
decoding: DeserializerConfig::Json,
decoding: DeserializerConfig::Json {
json: Default::default(),
},
multiple_outputs: false,
want: HashMap::from([(
None,
Expand All @@ -1620,7 +1622,9 @@ fn test_config_outputs() {
(
"json / multiple output",
TestCase {
decoding: DeserializerConfig::Json,
decoding: DeserializerConfig::Json {
json: Default::default(),
},
multiple_outputs: true,
want: HashMap::from([
(
Expand Down
4 changes: 3 additions & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ impl ValidatableComponent for HttpClientConfig {
let config = Self {
endpoint: uri.to_string(),
interval: Duration::from_secs(1),
decoding: DeserializerConfig::Json,
decoding: DeserializerConfig::Json {
json: Default::default(),
},
..Default::default()
};

Expand Down
Loading