Skip to content

Commit 2dfa850

Browse files
authored
feat(codecs): add lossy option to gelf, native_json, and syslog deserializers (vectordotdev#17680)
Adds a `lossy` option to the relevant deserializers. <!-- **Your PR title must conform to the conventional commit spec!** <type>(<scope>)!: <description> * `type` = chore, enhancement, feat, fix, docs * `!` = OPTIONAL: signals a breaking change * `scope` = Optional when `type` is "chore" or "docs", available scopes https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20 * `description` = short description of the change Examples: * enhancement(file source): Add `sort` option to sort discovered files * feat(new source): Initial `statsd` source * fix(file source): Fix a bug discovering new files * chore(external docs): Clarify `batch_size` option -->
1 parent ab1169b commit 2dfa850

File tree

30 files changed

+1185
-165
lines changed

30 files changed

+1185
-165
lines changed

lib/codecs/src/decoding/format/gelf.rs

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use bytes::Bytes;
22
use chrono::{DateTime, NaiveDateTime, Utc};
3+
use derivative::Derivative;
34
use lookup::{event_path, owned_value_path, PathPrefix};
45
use serde::{Deserialize, Serialize};
56
use smallvec::{smallvec, SmallVec};
67
use std::collections::HashMap;
8+
use vector_config::configurable_component;
79
use vector_core::config::LogNamespace;
810
use vector_core::{
911
config::{log_schema, DataType},
@@ -14,7 +16,7 @@ use vector_core::{
1416
use vrl::value::kind::Collection;
1517
use vrl::value::{Kind, Value};
1618

17-
use super::Deserializer;
19+
use super::{default_lossy, Deserializer};
1820
use crate::{gelf_fields::*, VALID_FIELD_REGEX};
1921

2022
/// On GELF decoding behavior:
@@ -25,12 +27,26 @@ use crate::{gelf_fields::*, VALID_FIELD_REGEX};
2527
2628
/// Config used to build a `GelfDeserializer`.
2729
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
28-
pub struct GelfDeserializerConfig;
30+
pub struct GelfDeserializerConfig {
31+
#[serde(
32+
default,
33+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
34+
)]
35+
/// GELF-specific decoding options.
36+
pub gelf: GelfDeserializerOptions,
37+
}
2938

3039
impl GelfDeserializerConfig {
40+
/// Creates a new `GelfDeserializerConfig`.
41+
pub fn new(options: GelfDeserializerOptions) -> Self {
42+
Self { gelf: options }
43+
}
44+
3145
/// Build the `GelfDeserializer` from this configuration.
3246
pub fn build(&self) -> GelfDeserializer {
33-
GelfDeserializer::default()
47+
GelfDeserializer {
48+
lossy: self.gelf.lossy,
49+
}
3450
}
3551

3652
/// Return the type of event built by this deserializer.
@@ -60,21 +76,36 @@ impl GelfDeserializerConfig {
6076
}
6177
}
6278

63-
/// Deserializer that builds an `Event` from a byte frame containing a GELF log
64-
/// message.
65-
#[derive(Debug, Clone)]
66-
pub struct GelfDeserializer;
79+
/// GELF-specific decoding options.
80+
#[configurable_component]
81+
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
82+
#[derivative(Default)]
83+
pub struct GelfDeserializerOptions {
84+
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
85+
///
86+
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
87+
///
88+
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
89+
#[serde(
90+
default = "default_lossy",
91+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
92+
)]
93+
#[derivative(Default(value = "default_lossy()"))]
94+
pub lossy: bool,
95+
}
6796

68-
impl Default for GelfDeserializer {
69-
fn default() -> Self {
70-
Self::new()
71-
}
97+
/// Deserializer that builds an `Event` from a byte frame containing a GELF log message.
98+
#[derive(Debug, Clone, Derivative)]
99+
#[derivative(Default)]
100+
pub struct GelfDeserializer {
101+
#[derivative(Default(value = "default_lossy()"))]
102+
lossy: bool,
72103
}
73104

74105
impl GelfDeserializer {
75-
/// Create a new GelfDeserializer
76-
pub fn new() -> GelfDeserializer {
77-
GelfDeserializer
106+
/// Create a new `GelfDeserializer`.
107+
pub fn new(lossy: bool) -> GelfDeserializer {
108+
GelfDeserializer { lossy }
78109
}
79110

80111
/// Builds a LogEvent from the parsed GelfMessage.
@@ -195,10 +226,10 @@ impl Deserializer for GelfDeserializer {
195226
bytes: Bytes,
196227
_log_namespace: LogNamespace,
197228
) -> vector_common::Result<SmallVec<[Event; 1]>> {
198-
let line = std::str::from_utf8(&bytes)?;
199-
let line = line.trim();
200-
201-
let parsed: GelfMessage = serde_json::from_str(line)?;
229+
let parsed: GelfMessage = match self.lossy {
230+
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
231+
false => serde_json::from_slice(&bytes),
232+
}?;
202233
let event = self.message_to_event(&parsed)?;
203234

204235
Ok(smallvec![event])
@@ -220,7 +251,7 @@ mod tests {
220251
fn deserialize_gelf_input(
221252
input: &serde_json::Value,
222253
) -> vector_common::Result<SmallVec<[Event; 1]>> {
223-
let config = GelfDeserializerConfig;
254+
let config = GelfDeserializerConfig::default();
224255
let deserializer = config.build();
225256
let buffer = Bytes::from(serde_json::to_vec(&input).unwrap());
226257
deserializer.parse(buffer, LogNamespace::Legacy)

lib/codecs/src/decoding/format/json.rs

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use bytes::Bytes;
44
use chrono::Utc;
55
use derivative::Derivative;
66
use lookup::PathPrefix;
7-
use serde::{Deserialize, Serialize};
87
use smallvec::{smallvec, SmallVec};
98
use vector_config::configurable_component;
109
use vector_core::{
@@ -14,42 +13,27 @@ use vector_core::{
1413
};
1514
use vrl::value::Kind;
1615

17-
use super::Deserializer;
16+
use super::{default_lossy, Deserializer};
1817

1918
/// Config used to build a `JsonDeserializer`.
20-
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21-
pub struct JsonDeserializerConfig {
22-
#[serde(
23-
default,
24-
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
25-
)]
26-
/// Options for the JSON deserializer.
27-
pub json: JsonDeserializerOptions,
28-
}
29-
30-
/// JSON-specific decoding options.
3119
#[configurable_component]
3220
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
3321
#[derivative(Default)]
34-
pub struct JsonDeserializerOptions {
35-
/// Determines whether or not to replace invalid UTF-8 sequences instead of returning an error.
36-
///
37-
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
38-
///
39-
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
22+
pub struct JsonDeserializerConfig {
4023
#[serde(
41-
default = "default_lossy",
24+
default,
4225
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
4326
)]
44-
#[derivative(Default(value = "default_lossy()"))]
45-
lossy: bool,
46-
}
47-
48-
const fn default_lossy() -> bool {
49-
true
27+
/// JSON-specific decoding options.
28+
pub json: JsonDeserializerOptions,
5029
}
5130

5231
impl JsonDeserializerConfig {
32+
/// Creates a new `JsonDeserializerConfig`.
33+
pub fn new(options: JsonDeserializerOptions) -> Self {
34+
Self { json: options }
35+
}
36+
5337
/// Build the `JsonDeserializer` from this configuration.
5438
pub fn build(&self) -> JsonDeserializer {
5539
Into::<JsonDeserializer>::into(self)
@@ -85,11 +69,22 @@ impl JsonDeserializerConfig {
8569
}
8670
}
8771

88-
impl JsonDeserializerConfig {
89-
/// Creates a new `JsonDeserializerConfig`.
90-
pub fn new(options: JsonDeserializerOptions) -> Self {
91-
Self { json: options }
92-
}
72+
/// JSON-specific decoding options.
73+
#[configurable_component]
74+
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
75+
#[derivative(Default)]
76+
pub struct JsonDeserializerOptions {
77+
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
78+
///
79+
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
80+
///
81+
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
82+
#[serde(
83+
default = "default_lossy",
84+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
85+
)]
86+
#[derivative(Default(value = "default_lossy()"))]
87+
pub lossy: bool,
9388
}
9489

9590
/// Deserializer that builds `Event`s from a byte frame containing JSON.

lib/codecs/src/decoding/format/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,19 @@ mod syslog;
1313

1414
use ::bytes::Bytes;
1515
use dyn_clone::DynClone;
16-
pub use gelf::{GelfDeserializer, GelfDeserializerConfig};
16+
pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions};
1717
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
1818
pub use native::{NativeDeserializer, NativeDeserializerConfig};
19-
pub use native_json::{NativeJsonDeserializer, NativeJsonDeserializerConfig};
19+
pub use native_json::{
20+
NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions,
21+
};
2022
use smallvec::SmallVec;
23+
#[cfg(feature = "syslog")]
24+
pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
2125
use vector_core::config::LogNamespace;
2226
use vector_core::event::Event;
2327

2428
pub use self::bytes::{BytesDeserializer, BytesDeserializerConfig};
25-
#[cfg(feature = "syslog")]
26-
pub use self::syslog::{SyslogDeserializer, SyslogDeserializerConfig};
2729

2830
/// Parse structured events from bytes.
2931
pub trait Deserializer: DynClone + Send + Sync {
@@ -44,3 +46,8 @@ dyn_clone::clone_trait_object!(Deserializer);
4446

4547
/// A `Box` containing a `Deserializer`.
4648
pub type BoxedDeserializer = Box<dyn Deserializer>;
49+
50+
/// Default value for the UTF-8 lossy option.
51+
const fn default_lossy() -> bool {
52+
true
53+
}

lib/codecs/src/decoding/format/native_json.rs

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,35 @@
11
use bytes::Bytes;
2+
use derivative::Derivative;
23
use serde::{Deserialize, Serialize};
34
use smallvec::{smallvec, SmallVec};
5+
use vector_config::configurable_component;
46
use vector_core::{config::DataType, event::Event, schema};
57
use vrl::value::kind::Collection;
68
use vrl::value::Kind;
79

8-
use super::Deserializer;
10+
use super::{default_lossy, Deserializer};
911
use vector_core::config::LogNamespace;
1012

1113
/// Config used to build a `NativeJsonDeserializer`.
1214
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
13-
pub struct NativeJsonDeserializerConfig;
15+
pub struct NativeJsonDeserializerConfig {
16+
/// Vector's native JSON-specific decoding options.
17+
pub native_json: NativeJsonDeserializerOptions,
18+
}
1419

1520
impl NativeJsonDeserializerConfig {
21+
/// Creates a new `NativeJsonDeserializerConfig`.
22+
pub fn new(options: NativeJsonDeserializerOptions) -> Self {
23+
Self {
24+
native_json: options,
25+
}
26+
}
27+
1628
/// Build the `NativeJsonDeserializer` from this configuration.
17-
pub const fn build(&self) -> NativeJsonDeserializer {
18-
NativeJsonDeserializer
29+
pub fn build(&self) -> NativeJsonDeserializer {
30+
NativeJsonDeserializer {
31+
lossy: self.native_json.lossy,
32+
}
1933
}
2034

2135
/// Return the type of event build by this deserializer.
@@ -37,10 +51,32 @@ impl NativeJsonDeserializerConfig {
3751
}
3852
}
3953

54+
/// Vector's native JSON-specific decoding options.
55+
#[configurable_component]
56+
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
57+
#[derivative(Default)]
58+
pub struct NativeJsonDeserializerOptions {
59+
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
60+
///
61+
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
62+
///
63+
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
64+
#[serde(
65+
default = "default_lossy",
66+
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
67+
)]
68+
#[derivative(Default(value = "default_lossy()"))]
69+
pub lossy: bool,
70+
}
71+
4072
/// Deserializer that builds `Event`s from a byte frame containing Vector's native JSON
4173
/// representation.
42-
#[derive(Debug, Clone, Default)]
43-
pub struct NativeJsonDeserializer;
74+
#[derive(Debug, Clone, Derivative)]
75+
#[derivative(Default)]
76+
pub struct NativeJsonDeserializer {
77+
#[derivative(Default(value = "default_lossy()"))]
78+
lossy: bool,
79+
}
4480

4581
impl Deserializer for NativeJsonDeserializer {
4682
fn parse(
@@ -56,8 +92,11 @@ impl Deserializer for NativeJsonDeserializer {
5692
return Ok(smallvec![]);
5793
}
5894

59-
let json: serde_json::Value = serde_json::from_slice(&bytes)
60-
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
95+
let json: serde_json::Value = match self.lossy {
96+
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
97+
false => serde_json::from_slice(&bytes),
98+
}
99+
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
61100

62101
let events = match json {
63102
serde_json::Value::Array(values) => values
@@ -79,7 +118,7 @@ mod test {
79118

80119
#[test]
81120
fn parses_top_level_arrays() {
82-
let config = NativeJsonDeserializerConfig;
121+
let config = NativeJsonDeserializerConfig::default();
83122
let deserializer = config.build();
84123

85124
let json1 = json!({"a": "b", "c": "d"});

0 commit comments

Comments
 (0)