Skip to content

Commit 32950d8

Browse files
prontStephenWakely
andauthored
feat: Migrate LogSchema::host_key to new lookup code (vectordotdev#17972)
This part of vectordotdev#13033. Summary of these changes: * LogSchema::host_key is now an `OptionalValuePath` * `host_key`s that appear in configs are now also `OptionalValuePath`s * There should be no `unwrap()` calls outside of tests. --------- Co-authored-by: Stephen Wakely <[email protected]>
1 parent fd10e69 commit 32950d8

File tree

39 files changed

+306
-201
lines changed

39 files changed

+306
-201
lines changed

lib/vector-core/src/config/log_schema.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct LogSchema {
5555
/// This field will generally represent a real host, or container, that generated the message,
5656
/// but is somewhat source-dependent.
5757
#[serde(default = "LogSchema::default_host_key")]
58-
host_key: String,
58+
host_key: OptionalValuePath,
5959

6060
/// The name of the event field to set the source identifier in.
6161
///
@@ -92,8 +92,8 @@ impl LogSchema {
9292
OptionalValuePath::new("timestamp")
9393
}
9494

95-
fn default_host_key() -> String {
96-
String::from("host")
95+
fn default_host_key() -> OptionalValuePath {
96+
OptionalValuePath::new("host")
9797
}
9898

9999
fn default_source_type_key() -> OptionalValuePath {
@@ -121,8 +121,8 @@ impl LogSchema {
121121
self.timestamp_key.path.as_ref()
122122
}
123123

124-
pub fn host_key(&self) -> &str {
125-
&self.host_key
124+
pub fn host_key(&self) -> Option<&OwnedValuePath> {
125+
self.host_key.path.as_ref()
126126
}
127127

128128
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
@@ -141,8 +141,8 @@ impl LogSchema {
141141
self.timestamp_key = OptionalValuePath { path: v };
142142
}
143143

144-
pub fn set_host_key(&mut self, v: String) {
145-
self.host_key = v;
144+
pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
145+
self.host_key = OptionalValuePath { path };
146146
}
147147

148148
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
@@ -169,7 +169,7 @@ impl LogSchema {
169169
{
170170
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
171171
} else {
172-
self.set_host_key(other.host_key().to_string());
172+
self.set_host_key(other.host_key().cloned());
173173
}
174174
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
175175
&& self.message_key() != other.message_key()

lib/vector-core/src/event/log_event.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ impl LogEvent {
458458
pub fn host_path(&self) -> Option<String> {
459459
match self.namespace() {
460460
LogNamespace::Vector => self.find_key_by_meaning("host"),
461-
LogNamespace::Legacy => Some(log_schema().host_key().to_owned()),
461+
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
462462
}
463463
}
464464

@@ -505,7 +505,9 @@ impl LogEvent {
505505
pub fn get_host(&self) -> Option<&Value> {
506506
match self.namespace() {
507507
LogNamespace::Vector => self.get_by_meaning("host"),
508-
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().host_key())),
508+
LogNamespace::Legacy => log_schema()
509+
.host_key()
510+
.and_then(|key| self.get((PathPrefix::Event, key))),
509511
}
510512
}
511513

lib/vector-lookup/src/lookup_v2/optional_path.rs

+8
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,11 @@ impl From<OwnedValuePath> for OptionalValuePath {
9191
Self { path: Some(path) }
9292
}
9393
}
94+
95+
impl From<Option<OwnedValuePath>> for OptionalValuePath {
96+
fn from(value: Option<OwnedValuePath>) -> Self {
97+
value.map_or(OptionalValuePath::none(), |path| {
98+
OptionalValuePath::from(path)
99+
})
100+
}
101+
}

src/config/mod.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,10 @@ mod tests {
843843
)
844844
.unwrap();
845845

846-
assert_eq!("host", config.global.log_schema.host_key().to_string());
846+
assert_eq!(
847+
"host",
848+
config.global.log_schema.host_key().unwrap().to_string()
849+
);
847850
assert_eq!(
848851
"message",
849852
config.global.log_schema.message_key().to_string()
@@ -879,7 +882,10 @@ mod tests {
879882
)
880883
.unwrap();
881884

882-
assert_eq!("this", config.global.log_schema.host_key().to_string());
885+
assert_eq!(
886+
"this",
887+
config.global.log_schema.host_key().unwrap().to_string()
888+
);
883889
assert_eq!("that", config.global.log_schema.message_key().to_string());
884890
assert_eq!(
885891
"then",

src/sinks/datadog/metrics/encoder.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,10 @@ fn sketch_to_proto_message(
385385
let name = get_namespaced_name(metric, default_namespace);
386386
let ts = encode_timestamp(metric.timestamp());
387387
let mut tags = metric.tags().cloned().unwrap_or_default();
388-
let host = tags.remove(log_schema.host_key()).unwrap_or_default();
388+
let host = log_schema
389+
.host_key()
390+
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
391+
.unwrap_or_default();
389392
let tags = encode_tags(&tags);
390393

391394
let cnt = ddsketch.count() as i64;
@@ -497,7 +500,10 @@ fn generate_series_metrics(
497500
let name = get_namespaced_name(metric, default_namespace);
498501

499502
let mut tags = metric.tags().cloned().unwrap_or_default();
500-
let host = tags.remove(log_schema.host_key());
503+
let host = log_schema
504+
.host_key()
505+
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
506+
501507
let source_type_name = tags.remove("source_type_name");
502508
let device = tags.remove("device");
503509
let ts = encode_timestamp(metric.timestamp());

src/sinks/datadog/traces/sink.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use futures_util::{
77
};
88
use tokio::sync::oneshot::{channel, Sender};
99
use tower::Service;
10+
use vrl::path::PathPrefix;
11+
1012
use vector_core::{
1113
config::log_schema,
1214
event::Event,
@@ -15,11 +17,13 @@ use vector_core::{
1517
stream::{BatcherSettings, DriverResponse},
1618
};
1719

18-
use super::service::TraceApiRequest;
1920
use crate::{
2021
internal_events::DatadogTracesEncodingError,
2122
sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
2223
};
24+
25+
use super::service::TraceApiRequest;
26+
2327
#[derive(Default)]
2428
struct EventPartitioner;
2529

@@ -51,9 +55,10 @@ impl Partitioner for EventPartitioner {
5155
Event::Trace(t) => PartitionKey {
5256
api_key: item.metadata().datadog_api_key(),
5357
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
54-
hostname: t
55-
.get(log_schema().host_key())
56-
.map(|s| s.to_string_lossy().into_owned()),
58+
hostname: log_schema().host_key().and_then(|key| {
59+
t.get((PathPrefix::Event, key))
60+
.map(|s| s.to_string_lossy().into_owned())
61+
}),
5762
agent_version: t
5863
.get("agent_version")
5964
.map(|s| s.to_string_lossy().into_owned()),

src/sinks/humio/logs.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use lookup::lookup_v2::OptionalValuePath;
33
use vector_common::sensitive_string::SensitiveString;
44
use vector_config::configurable_component;
55

6-
use super::host_key;
6+
use super::config_host_key;
77
use crate::sinks::splunk_hec::common::config_timestamp_key;
88
use crate::{
99
codecs::EncodingConfig,
@@ -74,8 +74,8 @@ pub struct HumioLogsConfig {
7474
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
7575
///
7676
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
77-
#[serde(default = "host_key")]
78-
pub(super) host_key: String,
77+
#[serde(default = "config_host_key")]
78+
pub(super) host_key: OptionalValuePath,
7979

8080
/// Event fields to be added to Humio’s extra fields.
8181
///
@@ -154,7 +154,7 @@ impl GenerateConfig for HumioLogsConfig {
154154
event_type: None,
155155
indexed_fields: vec![],
156156
index: None,
157-
host_key: host_key(),
157+
host_key: config_host_key(),
158158
compression: Compression::default(),
159159
request: TowerRequestConfig::default(),
160160
batch: BatchConfig::default(),
@@ -231,6 +231,7 @@ mod integration_tests {
231231
use serde::Deserialize;
232232
use serde_json::{json, Value as JsonValue};
233233
use tokio::time::Duration;
234+
use vrl::path::PathPrefix;
234235

235236
use super::*;
236237
use crate::{
@@ -262,14 +263,14 @@ mod integration_tests {
262263
let message = random_string(100);
263264
let host = "192.168.1.1".to_string();
264265
let mut event = LogEvent::from(message.clone());
265-
event.insert(log_schema().host_key(), host.clone());
266+
event.insert(
267+
(PathPrefix::Event, log_schema().host_key().unwrap()),
268+
host.clone(),
269+
);
266270

267271
let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
268272
event.insert(
269-
(
270-
lookup::PathPrefix::Event,
271-
log_schema().timestamp_key().unwrap(),
272-
),
273+
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
273274
ts,
274275
);
275276

@@ -387,7 +388,9 @@ mod integration_tests {
387388
source: None,
388389
encoding: JsonSerializerConfig::default().into(),
389390
event_type: None,
390-
host_key: log_schema().host_key().to_string(),
391+
host_key: OptionalValuePath {
392+
path: log_schema().host_key().cloned(),
393+
},
391394
indexed_fields: vec![],
392395
index: None,
393396
compression: Compression::None,

src/sinks/humio/metrics.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use vector_config::configurable_component;
99
use vector_core::sink::StreamSink;
1010

1111
use super::{
12-
host_key,
12+
config_host_key,
1313
logs::{HumioLogsConfig, HOST},
1414
};
1515
use crate::{
@@ -86,8 +86,8 @@ pub struct HumioMetricsConfig {
8686
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
8787
///
8888
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
89-
#[serde(default = "host_key")]
90-
host_key: String,
89+
#[serde(default = "config_host_key")]
90+
host_key: OptionalValuePath,
9191

9292
/// Event fields to be added to Humio’s extra fields.
9393
///

src/sinks/humio/mod.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
use lookup::lookup_v2::OptionalValuePath;
2+
13
pub mod logs;
24
pub mod metrics;
35

4-
fn host_key() -> String {
5-
crate::config::log_schema().host_key().to_string()
6+
pub fn config_host_key() -> OptionalValuePath {
7+
OptionalValuePath {
8+
path: crate::config::log_schema().host_key().cloned(),
9+
}
610
}

src/sinks/influxdb/logs.rs

+14-10
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use bytes::{Bytes, BytesMut};
44
use futures::SinkExt;
55
use http::{Request, Uri};
66
use indoc::indoc;
7+
use vrl::value::Kind;
8+
79
use lookup::lookup_v2::{parse_value_path, OptionalValuePath};
810
use lookup::{OwnedValuePath, PathPrefix};
911
use vector_config::configurable_component;
1012
use vector_core::config::log_schema;
1113
use vector_core::schema;
12-
use vrl::value::Kind;
1314

1415
use crate::{
1516
codecs::Transformer,
@@ -189,10 +190,8 @@ impl SinkConfig for InfluxDbLogsConfig {
189190
.host_key
190191
.clone()
191192
.and_then(|k| k.path)
192-
.unwrap_or_else(|| {
193-
parse_value_path(log_schema().host_key())
194-
.expect("global log_schema.host_key to be valid path")
195-
});
193+
.or(log_schema().host_key().cloned())
194+
.expect("global log_schema.host_key to be valid path");
196195

197196
let message_key = self
198197
.message_key
@@ -409,10 +408,10 @@ mod tests {
409408
use futures::{channel::mpsc, stream, StreamExt};
410409
use http::{request::Parts, StatusCode};
411410
use indoc::indoc;
411+
412412
use lookup::owned_value_path;
413413
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};
414414

415-
use super::*;
416415
use crate::{
417416
sinks::{
418417
influxdb::test_util::{assert_fields, split_line_protocol, ts},
@@ -427,6 +426,8 @@ mod tests {
427426
},
428427
};
429428

429+
use super::*;
430+
430431
type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;
431432

432433
#[test]
@@ -880,16 +881,17 @@ mod tests {
880881
#[cfg(feature = "influxdb-integration-tests")]
881882
#[cfg(test)]
882883
mod integration_tests {
884+
use std::sync::Arc;
885+
883886
use chrono::Utc;
884-
use codecs::BytesDeserializerConfig;
885887
use futures::stream;
888+
use vrl::value;
889+
890+
use codecs::BytesDeserializerConfig;
886891
use lookup::{owned_value_path, path};
887-
use std::sync::Arc;
888892
use vector_core::config::{LegacyKey, LogNamespace};
889893
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};
890-
use vrl::value;
891894

892-
use super::*;
893895
use crate::{
894896
config::SinkContext,
895897
sinks::influxdb::{
@@ -900,6 +902,8 @@ mod integration_tests {
900902
test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
901903
};
902904

905+
use super::*;
906+
903907
#[tokio::test]
904908
async fn influxdb2_logs_put_data() {
905909
let endpoint = address_v2();

src/sinks/splunk_hec/common/util.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,10 @@ pub fn build_uri(
132132
uri.parse::<Uri>()
133133
}
134134

135-
pub fn host_key() -> String {
136-
crate::config::log_schema().host_key().to_string()
135+
pub fn config_host_key() -> OptionalValuePath {
136+
OptionalValuePath {
137+
path: crate::config::log_schema().host_key().cloned(),
138+
}
137139
}
138140

139141
pub fn config_timestamp_key() -> OptionalValuePath {

0 commit comments

Comments
 (0)