Skip to content

Commit 0f14c0d

Browse files
authored
feat: Migrate LogSchema::message_key to new lookup code (vectordotdev#18024)
## Motivation This part of vectordotdev#13033. ## Summary * `LogSchema::message_key` is now an `OptionalValuePath`. * To avoid hacky `String` to `&'static str` conversions, I changed the `Requirement::meaning` key type to `String`.
1 parent 689a79e commit 0f14c0d

File tree

55 files changed

+639
-410
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+639
-410
lines changed

benches/codecs/character_delimited_bytes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn decoding(c: &mut Criterion) {
5555
.map(|ml| CharacterDelimitedDecoder::new_with_max_length(b'a', ml))
5656
.unwrap_or(CharacterDelimitedDecoder::new(b'a')),
5757
);
58-
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
58+
let deserializer = Deserializer::Bytes(BytesDeserializer);
5959
let decoder = vector::codecs::Decoder::new(framer, deserializer);
6060

6161
(Box::new(decoder), param.input.clone())

benches/codecs/newline_bytes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ fn decoding(c: &mut Criterion) {
5353
.map(|ml| NewlineDelimitedDecoder::new_with_max_length(ml))
5454
.unwrap_or(NewlineDelimitedDecoder::new()),
5555
);
56-
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
56+
let deserializer = Deserializer::Bytes(BytesDeserializer);
5757
let decoder = vector::codecs::Decoder::new(framer, deserializer);
5858

5959
(Box::new(decoder), param.input.clone())

lib/codecs/src/decoding/format/bytes.rs

+11-24
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use bytes::Bytes;
2-
use lookup::lookup_v2::parse_value_path;
32
use lookup::OwnedTargetPath;
43
use serde::{Deserialize, Serialize};
54
use smallvec::{smallvec, SmallVec};
@@ -9,6 +8,7 @@ use vector_core::{
98
event::{Event, LogEvent},
109
schema,
1110
};
11+
use vrl::path::PathPrefix;
1212
use vrl::value::Kind;
1313

1414
use super::Deserializer;
@@ -25,7 +25,7 @@ impl BytesDeserializerConfig {
2525

2626
/// Build the `BytesDeserializer` from this configuration.
2727
pub fn build(&self) -> BytesDeserializer {
28-
BytesDeserializer::new()
28+
BytesDeserializer
2929
}
3030

3131
/// Return the type of event build by this deserializer.
@@ -37,7 +37,7 @@ impl BytesDeserializerConfig {
3737
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
3838
match log_namespace {
3939
LogNamespace::Legacy => schema::Definition::empty_legacy_namespace().with_event_field(
40-
&parse_value_path(log_schema().message_key()).expect("valid message key"),
40+
log_schema().message_key().expect("valid message key"),
4141
Kind::bytes(),
4242
Some("message"),
4343
),
@@ -54,32 +54,16 @@ impl BytesDeserializerConfig {
5454
/// This deserializer can be considered as the no-op action for input where no
5555
/// further decoding has been specified.
5656
#[derive(Debug, Clone)]
57-
pub struct BytesDeserializer {
58-
// Only used with the "Legacy" namespace. The "Vector" namespace decodes the data at the root of the event.
59-
log_schema_message_key: &'static str,
60-
}
61-
62-
impl Default for BytesDeserializer {
63-
fn default() -> Self {
64-
Self::new()
65-
}
66-
}
57+
pub struct BytesDeserializer;
6758

6859
impl BytesDeserializer {
69-
/// Creates a new `BytesDeserializer`.
70-
pub fn new() -> Self {
71-
Self {
72-
log_schema_message_key: log_schema().message_key(),
73-
}
74-
}
75-
7660
/// Deserializes the given bytes, which will always produce a single `LogEvent`.
7761
pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
7862
match log_namespace {
7963
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
8064
LogNamespace::Legacy => {
8165
let mut log = LogEvent::default();
82-
log.insert(self.log_schema_message_key, bytes);
66+
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes);
8367
log
8468
}
8569
}
@@ -107,15 +91,18 @@ mod tests {
10791
#[test]
10892
fn deserialize_bytes_legacy_namespace() {
10993
let input = Bytes::from("foo");
110-
let deserializer = BytesDeserializer::new();
94+
let deserializer = BytesDeserializer;
11195

11296
let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
11397
let mut events = events.into_iter();
11498

11599
{
116100
let event = events.next().unwrap();
117101
let log = event.as_log();
118-
assert_eq!(log[log_schema().message_key()], "foo".into());
102+
assert_eq!(
103+
log[log_schema().message_key().unwrap().to_string()],
104+
"foo".into()
105+
);
119106
}
120107

121108
assert_eq!(events.next(), None);
@@ -124,7 +111,7 @@ mod tests {
124111
#[test]
125112
fn deserialize_bytes_vector_namespace() {
126113
let input = Bytes::from("foo");
127-
let deserializer = BytesDeserializer::new();
114+
let deserializer = BytesDeserializer;
128115

129116
let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
130117
assert_eq!(events.len(), 1);

lib/codecs/src/decoding/format/gelf.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ mod tests {
293293
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
294294
);
295295
assert_eq!(
296-
log.get(log_schema().message_key()),
296+
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
297297
Some(&Value::Bytes(Bytes::from_static(
298298
b"A short message that helps you identify what is going on"
299299
)))
@@ -348,7 +348,7 @@ mod tests {
348348
let events = deserialize_gelf_input(&input).unwrap();
349349
assert_eq!(events.len(), 1);
350350
let log = events[0].as_log();
351-
assert!(log.contains(log_schema().message_key()));
351+
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
352352
}
353353

354354
// filter out id

lib/codecs/src/decoding/format/syslog.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use bytes::Bytes;
22
use chrono::{DateTime, Datelike, Utc};
33
use derivative::Derivative;
4-
use lookup::lookup_v2::parse_value_path;
54
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
65
use smallvec::{smallvec, SmallVec};
76
use std::borrow::Cow;
@@ -71,7 +70,7 @@ impl SyslogDeserializerConfig {
7170
// The `message` field is always defined. If parsing fails, the entire body becomes the
7271
// message.
7372
.with_event_field(
74-
&parse_value_path(log_schema().message_key()).expect("valid message key"),
73+
log_schema().message_key().expect("valid message key"),
7574
Kind::bytes(),
7675
Some("message"),
7776
);
@@ -429,7 +428,7 @@ fn insert_fields_from_syslog(
429428
) {
430429
match log_namespace {
431430
LogNamespace::Legacy => {
432-
log.insert(event_path!(log_schema().message_key()), parsed.msg);
431+
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg);
433432
}
434433
LogNamespace::Vector => {
435434
log.insert(event_path!("message"), parsed.msg);
@@ -500,7 +499,10 @@ mod tests {
500499

501500
let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
502501
assert_eq!(events.len(), 1);
503-
assert_eq!(events[0].as_log()[log_schema().message_key()], "MSG".into());
502+
assert_eq!(
503+
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
504+
"MSG".into()
505+
);
504506
assert!(
505507
events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
506508
);
@@ -522,8 +524,8 @@ mod tests {
522524

523525
fn init() {
524526
let mut schema = LogSchema::default();
525-
schema.set_message_key("legacy_message".to_string());
526-
schema.set_message_key("legacy_timestamp".to_string());
527+
schema.set_message_key(Some(owned_value_path!("legacy_message")));
528+
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
527529
init_log_schema(schema, false);
528530
}
529531
}
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use vector_core::config::log_schema;
2+
use vector_core::schema;
3+
use vrl::value::Kind;
4+
5+
/// Inspect the global log schema and create a schema requirement.
6+
pub fn get_serializer_schema_requirement() -> schema::Requirement {
7+
if let Some(message_key) = log_schema().message_key() {
8+
schema::Requirement::empty().required_meaning(message_key.to_string(), Kind::any())
9+
} else {
10+
schema::Requirement::empty()
11+
}
12+
}

lib/codecs/src/encoding/format/gelf.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use vector_core::{
1212
event::Value,
1313
schema,
1414
};
15+
use vrl::path::PathPrefix;
1516

1617
/// On GELF encoding behavior:
1718
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
@@ -138,13 +139,15 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
138139
err_missing_field(HOST)?;
139140
}
140141

141-
let message_key = log_schema().message_key();
142142
if !log.contains(SHORT_MESSAGE) {
143-
// rename the log_schema().message_key() to SHORT_MESSAGE
144-
if log.contains(message_key) {
145-
log.rename_key(message_key, SHORT_MESSAGE);
146-
} else {
147-
err_missing_field(SHORT_MESSAGE)?;
143+
if let Some(message_key) = log_schema().message_key() {
144+
// rename the log_schema().message_key() to SHORT_MESSAGE
145+
let target_path = (PathPrefix::Event, message_key);
146+
if log.contains(target_path) {
147+
log.rename_key(target_path, SHORT_MESSAGE);
148+
} else {
149+
err_missing_field(SHORT_MESSAGE)?;
150+
}
148151
}
149152
}
150153
Ok(log)
@@ -329,7 +332,7 @@ mod tests {
329332
let event_fields = btreemap! {
330333
VERSION => "1.1",
331334
HOST => "example.org",
332-
log_schema().message_key() => "Some message",
335+
log_schema().message_key().unwrap().to_string() => "Some message",
333336
};
334337

335338
let jsn = do_serialize(true, event_fields).unwrap();

lib/codecs/src/encoding/format/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#![deny(missing_docs)]
55

66
mod avro;
7+
mod common;
78
mod csv;
89
mod gelf;
910
mod json;

lib/codecs/src/encoding/format/raw_message.rs

+4-16
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
1+
use crate::encoding::format::common::get_serializer_schema_requirement;
12
use bytes::{BufMut, BytesMut};
23
use serde::{Deserialize, Serialize};
34
use tokio_util::codec::Encoder;
4-
use vector_core::{
5-
config::{log_schema, DataType},
6-
event::Event,
7-
schema,
8-
};
9-
use vrl::value::Kind;
5+
use vector_core::{config::DataType, event::Event, schema};
106

117
/// Config used to build a `RawMessageSerializer`.
128
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
@@ -30,7 +26,7 @@ impl RawMessageSerializerConfig {
3026

3127
/// The schema required by the serializer.
3228
pub fn schema_requirement(&self) -> schema::Requirement {
33-
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
29+
get_serializer_schema_requirement()
3430
}
3531
}
3632

@@ -49,18 +45,10 @@ impl Encoder<Event> for RawMessageSerializer {
4945
type Error = vector_common::Error;
5046

5147
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
52-
let message_key = log_schema().message_key();
53-
5448
let log = event.as_log();
55-
56-
if let Some(bytes) = log
57-
.get_by_meaning(message_key)
58-
.or_else(|| log.get(message_key))
59-
.map(|value| value.coerce_to_bytes())
60-
{
49+
if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
6150
buffer.put(bytes);
6251
}
63-
6452
Ok(())
6553
}
6654
}

lib/codecs/src/encoding/format/text.rs

+4-14
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1+
use crate::encoding::format::common::get_serializer_schema_requirement;
12
use bytes::{BufMut, BytesMut};
23
use tokio_util::codec::Encoder;
3-
use vector_core::{
4-
config::{log_schema, DataType},
5-
event::Event,
6-
schema,
7-
};
8-
use vrl::value::Kind;
4+
use vector_core::{config::DataType, event::Event, schema};
95

106
use crate::MetricTagValues;
117

@@ -42,7 +38,7 @@ impl TextSerializerConfig {
4238

4339
/// The schema required by the serializer.
4440
pub fn schema_requirement(&self) -> schema::Requirement {
45-
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
41+
get_serializer_schema_requirement()
4642
}
4743
}
4844

@@ -67,15 +63,9 @@ impl Encoder<Event> for TextSerializer {
6763
type Error = vector_common::Error;
6864

6965
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
70-
let message_key = log_schema().message_key();
71-
7266
match event {
7367
Event::Log(log) => {
74-
if let Some(bytes) = log
75-
.get_by_meaning(message_key)
76-
.or_else(|| log.get(message_key))
77-
.map(|value| value.coerce_to_bytes())
78-
{
68+
if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
7969
buffer.put(bytes);
8070
}
8171
}

lib/opentelemetry-proto/src/convert.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use vector_core::{
77
config::{log_schema, LegacyKey, LogNamespace},
88
event::{Event, LogEvent},
99
};
10+
use vrl::path::PathPrefix;
1011
use vrl::value::Value;
1112

1213
use super::proto::{
@@ -94,7 +95,7 @@ impl ResourceLog {
9495
LogNamespace::Legacy => {
9596
let mut log = LogEvent::default();
9697
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
97-
log.insert(log_schema().message_key(), v);
98+
log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v);
9899
}
99100
log
100101
}

0 commit comments

Comments
 (0)