Skip to content

Commit 852227e

Browse files
authored
Merge branch 'vectordotdev:master' into master
2 parents 387008c + e2b83f2 commit 852227e

File tree

7 files changed

+198
-100
lines changed

7 files changed

+198
-100
lines changed

.github/workflows/publish.yml

Lines changed: 55 additions & 64 deletions
Large diffs are not rendered by default.

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ members = [
133133
]
134134

135135
[workspace.dependencies]
136+
cfg-if = { version = "1.0.0", default-features = false }
136137
chrono = { version = "0.4.38", default-features = false, features = ["clock", "serde"] }
137138
chrono-tz = { version = "0.10.0", default-features = false, features = ["serde"] }
138139
clap = { version = "4.5.20", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] }

lib/vector-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ vector-config = { path = "../vector-config" }
7070
vector-config-common = { path = "../vector-config-common" }
7171
vector-config-macros = { path = "../vector-config-macros" }
7272
vrl.workspace = true
73+
cfg-if.workspace = true
7374

7475
[target.'cfg(target_os = "macos")'.dependencies]
7576
security-framework = "2.10.0"

lib/vector-core/src/config/mod.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
3030
// This enum should be kept alphabetically sorted as the bitmask value is used when
3131
// sorting sources by data type in the GraphQL API.
3232
#[bitmask(u8)]
33+
#[bitmask_config(flags_iter)]
3334
pub enum DataType {
3435
Log,
3536
Metric,
@@ -38,11 +39,11 @@ pub enum DataType {
3839

3940
impl fmt::Display for DataType {
4041
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41-
let mut t = Vec::new();
42-
self.contains(DataType::Log).then(|| t.push("Log"));
43-
self.contains(DataType::Metric).then(|| t.push("Metric"));
44-
self.contains(DataType::Trace).then(|| t.push("Trace"));
45-
f.write_str(&t.join(","))
42+
f.debug_list()
43+
.entries(
44+
Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
45+
)
46+
.finish()
4647
}
4748
}
4849

@@ -191,6 +192,24 @@ impl SourceOutput {
191192
}
192193
}
193194

195+
fn fmt_helper(
196+
f: &mut fmt::Formatter<'_>,
197+
maybe_port: Option<&String>,
198+
data_type: DataType,
199+
) -> fmt::Result {
200+
match maybe_port {
201+
Some(port) => write!(f, "port: \"{port}\",",),
202+
None => write!(f, "port: None,"),
203+
}?;
204+
write!(f, " types: {data_type}")
205+
}
206+
207+
impl fmt::Display for SourceOutput {
208+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209+
fmt_helper(f, self.port.as_ref(), self.ty)
210+
}
211+
}
212+
194213
#[derive(Debug, Clone, PartialEq)]
195214
pub struct TransformOutput {
196215
pub port: Option<String>,
@@ -203,6 +222,12 @@ pub struct TransformOutput {
203222
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
204223
}
205224

225+
impl fmt::Display for TransformOutput {
226+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227+
fmt_helper(f, self.port.as_ref(), self.ty)
228+
}
229+
}
230+
206231
impl TransformOutput {
207232
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
208233
/// Designed for use in transforms.

lib/vector-core/src/config/telemetry.rs

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,77 @@
1-
use std::sync::{LazyLock, OnceLock};
2-
1+
use cfg_if::cfg_if;
32
use vector_common::request_metadata::GroupedCountByteSize;
43
use vector_config::configurable_component;
54

6-
static TELEMETRY: OnceLock<Telemetry> = OnceLock::new();
7-
static TELEMETRY_DEFAULT: LazyLock<Telemetry> = LazyLock::new(Telemetry::default);
8-
9-
/// Loads the telemetry options from configurations and sets the global options.
10-
/// Once this is done, configurations can be correctly loaded using configured
11-
/// log schema defaults.
12-
///
13-
/// # Errors
14-
///
15-
/// This function will fail if the `builder` fails.
16-
///
17-
/// # Panics
18-
///
19-
/// If deny is set, will panic if telemetry has already been set.
20-
pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
21-
assert!(
22-
!(TELEMETRY.set(telemetry).is_err() && deny_if_set),
23-
"Couldn't set telemetry"
24-
);
25-
}
5+
cfg_if! {
6+
// The telemetry code assumes a process wide singleton. When running `cargo test`,
7+
// multiple threads might try to read/write this global.
8+
if #[cfg(any(test, feature = "test"))] {
9+
use std::sync::{Arc, Mutex};
10+
11+
thread_local! {
12+
static TELEMETRY: Arc<Mutex<Option<Telemetry>>> = Arc::new(Mutex::new(None));
13+
}
14+
15+
/// Test implementation.
16+
///
17+
/// # Panics
18+
///
19+
/// If deny is set, will panic if telemetry has already been set.
20+
/// Also, panics if the lock is poisoned.
21+
pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
22+
TELEMETRY.with(|tl| {
23+
let mut tl = tl.lock().expect("telemetry lock poisoned");
24+
assert!(!(tl.is_some() && deny_if_set), "Couldn't set telemetry");
25+
*tl = Some(telemetry);
26+
});
27+
}
28+
29+
/// Test implementation.
30+
///
31+
/// # Panics
32+
///
33+
/// If the lock is poisoned.
34+
pub fn telemetry() -> Telemetry {
35+
TELEMETRY.with(|tl| {
36+
let mut tl = tl.lock().expect("telemetry lock poisoned");
37+
// For non-test code we return `TELEMETRY_DEFAULT`.
38+
// For test code, we will instantiate a default instance per thread.
39+
if tl.is_none() {
40+
*tl = Some(Telemetry::default());
41+
}
42+
tl.clone().unwrap()
43+
})
44+
}
45+
}
46+
else {
47+
use std::sync::{LazyLock, OnceLock};
2648

27-
/// Returns the telemetry configuration options.
28-
pub fn telemetry() -> &'static Telemetry {
29-
TELEMETRY.get().unwrap_or(&TELEMETRY_DEFAULT)
49+
static TELEMETRY: OnceLock<Telemetry> = OnceLock::new();
50+
static TELEMETRY_DEFAULT: LazyLock<Telemetry> = LazyLock::new(Telemetry::default);
51+
52+
/// Loads the telemetry options from configurations and sets the global options.
53+
/// Once this is done, configurations can be correctly loaded using configured
54+
/// log schema defaults.
55+
///
56+
/// # Errors
57+
///
58+
/// This function will fail if the `builder` fails.
59+
///
60+
/// # Panics
61+
///
62+
/// If deny is set, will panic if telemetry has already been set.
63+
pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
64+
assert!(
65+
!(TELEMETRY.set(telemetry).is_err() && deny_if_set),
66+
"Couldn't set telemetry"
67+
);
68+
}
69+
70+
/// Returns the telemetry configuration options.
71+
pub fn telemetry() -> &'static Telemetry {
72+
TELEMETRY.get().unwrap_or(&TELEMETRY_DEFAULT)
73+
}
74+
}
3075
}
3176

3277
/// Sets options for the telemetry that Vector emits.

src/config/graph.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use indexmap::{set::IndexSet, IndexMap};
2-
use std::collections::{HashMap, HashSet, VecDeque};
3-
41
use super::{
52
schema, ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
63
TransformOutput,
74
};
5+
use indexmap::{set::IndexSet, IndexMap};
6+
use std::collections::{HashMap, HashSet, VecDeque};
7+
use std::fmt;
88

99
#[derive(Debug, Clone)]
1010
pub enum Node {
@@ -20,6 +20,33 @@ pub enum Node {
2020
},
2121
}
2222

23+
impl fmt::Display for Node {
24+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25+
match self {
26+
Node::Source { outputs } => {
27+
write!(f, "component_kind: source\n outputs:")?;
28+
for output in outputs {
29+
write!(f, "\n {}", output)?;
30+
}
31+
Ok(())
32+
}
33+
Node::Transform { in_ty, outputs } => {
34+
write!(
35+
f,
36+
"component_kind: source\n input_types: {in_ty}\n outputs:"
37+
)?;
38+
for output in outputs {
39+
write!(f, "\n {}", output)?;
40+
}
41+
Ok(())
42+
}
43+
Node::Sink { ty } => {
44+
write!(f, "component_kind: sink\n types: {ty}")
45+
}
46+
}
47+
}
48+
}
49+
2350
#[derive(Debug, Clone)]
2451
struct Edge {
2552
from: OutputId,
@@ -139,9 +166,16 @@ impl Graph {
139166
Some(Node::Sink { .. }) => "sink",
140167
_ => panic!("only transforms and sinks have inputs"),
141168
};
169+
info!(
170+
"Available components:\n{}",
171+
self.nodes
172+
.iter()
173+
.map(|(key, node)| format!("\"{}\":\n {}", key, node))
174+
.collect::<Vec<_>>()
175+
.join("\n")
176+
);
142177
Err(format!(
143-
"Input \"{}\" for {} \"{}\" doesn't match any components.",
144-
from, output_type, to
178+
"Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
145179
))
146180
}
147181
}
@@ -510,7 +544,7 @@ mod test {
510544

511545
assert_eq!(
512546
Err(vec![
513-
"Data type mismatch between in (Log) and out (Metric)".into()
547+
"Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
514548
]),
515549
graph.typecheck()
516550
);

0 commit comments

Comments
 (0)