Skip to content

Commit 15b6f48

Browse files
Hextabruceg
andauthored
fix(config): Fix preloading log_schema (#17759)
Preload global parameter log_schema before compiling sources config. It fixes an issue with globally configured log_schema and sources in separate files. Revert 3e971fb --------- Signed-off-by: Artur Malchanau <[email protected]> Co-authored-by: Bruce Guenter <[email protected]>
1 parent cd397ab commit 15b6f48

File tree

5 files changed

+91
-18
lines changed

5 files changed

+91
-18
lines changed

src/app.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -466,12 +466,14 @@ pub async fn load_configs(
466466
paths = ?config_paths.iter().map(<&PathBuf>::from).collect::<Vec<_>>()
467467
);
468468

469+
// config::init_log_schema should be called before initializing sources.
470+
#[cfg(not(feature = "enterprise-tests"))]
471+
config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?;
472+
469473
let mut config =
470474
config::load_from_paths_with_provider_and_secrets(&config_paths, signal_handler)
471475
.await
472476
.map_err(handle_config_errors)?;
473-
#[cfg(not(feature = "enterprise-tests"))]
474-
config::init_log_schema(config.global.log_schema.clone(), true);
475477

476478
config::init_telemetry(config.global.telemetry.clone(), true);
477479

src/config/mod.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,19 @@ pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
6363
pub use validation::warnings;
6464
pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX};
6565
pub use vector_core::config::{
66-
init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId,
66+
init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId,
6767
};
6868

69+
/// Loads Log Schema from configurations and sets global schema.
70+
/// Once this is done, configurations can be correctly loaded using
71+
/// configured log schema defaults.
72+
/// If deny is set, will panic if schema has already been set.
73+
pub fn init_log_schema(config_paths: &[ConfigPath], deny_if_set: bool) -> Result<(), Vec<String>> {
74+
let (builder, _) = load_builder_from_paths(config_paths)?;
75+
vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set);
76+
Ok(())
77+
}
78+
6979
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
7080
pub enum ConfigPath {
7181
File(PathBuf, FormatHint),

src/config/unit_test/mod.rs

+1-14
Original file line numberDiff line numberDiff line change
@@ -72,24 +72,11 @@ impl UnitTest {
7272
}
7373
}
7474

75-
/// Loads Log Schema from configurations and sets global schema.
76-
/// Once this is done, configurations can be correctly loaded using
77-
/// configured log schema defaults.
78-
/// If deny is set, will panic if schema has already been set.
79-
fn init_log_schema_from_paths(
80-
config_paths: &[ConfigPath],
81-
deny_if_set: bool,
82-
) -> Result<(), Vec<String>> {
83-
let (builder, _) = config::loading::load_builder_from_paths(config_paths)?;
84-
vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set);
85-
Ok(())
86-
}
87-
8875
pub async fn build_unit_tests_main(
8976
paths: &[ConfigPath],
9077
signal_handler: &mut signal::SignalHandler,
9178
) -> Result<Vec<UnitTest>, Vec<String>> {
92-
init_log_schema_from_paths(paths, false)?;
79+
config::init_log_schema(paths, false)?;
9380
let (mut secrets_backends_loader, _) = loading::load_secret_backends_from_paths(paths)?;
9481
let (config_builder, _) = if secrets_backends_loader.has_secrets_to_retrieve() {
9582
let resolved_secrets = secrets_backends_loader

src/validate.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,12 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
135135
fmt.title(format!("Failed to load {:?}", &paths_list));
136136
fmt.sub_error(errors);
137137
};
138+
config::init_log_schema(&paths, true)
139+
.map_err(&mut report_error)
140+
.ok()?;
138141
let (builder, load_warnings) = config::load_builder_from_paths(&paths)
139142
.map_err(&mut report_error)
140143
.ok()?;
141-
config::init_log_schema(builder.global.log_schema.clone(), true);
142144

143145
// Build
144146
let (config, build_warnings) = builder

tests/integration/shutdown.rs

+72
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
fs::create_dir,
23
fs::read_dir,
34
io::Write,
45
net::SocketAddr,
@@ -200,6 +201,77 @@ fn log_schema() {
200201
assert_eq!(event["test_msg"], json!("42"));
201202
}
202203

204+
#[test]
205+
fn log_schema_multiple_config_files() {
206+
// Vector command
207+
let mut cmd = Command::cargo_bin("vector").unwrap();
208+
209+
let config_dir = create_directory();
210+
211+
let sinks_config_dir = config_dir.join("sinks");
212+
create_dir(sinks_config_dir.clone()).unwrap();
213+
214+
let sources_config_dir = config_dir.join("sources");
215+
create_dir(sources_config_dir.clone()).unwrap();
216+
217+
let input_dir = create_directory();
218+
let input_file = input_dir.join("input_file");
219+
220+
overwrite_file(
221+
config_dir.join("vector.toml"),
222+
r#"
223+
data_dir = "${VECTOR_DATA_DIR}"
224+
log_schema.host_key = "test_host"
225+
"#,
226+
);
227+
228+
overwrite_file(
229+
sources_config_dir.join("in_file.toml"),
230+
r#"
231+
type = "file"
232+
include = ["${VECTOR_TEST_INPUT_FILE}"]
233+
"#,
234+
);
235+
236+
overwrite_file(
237+
sinks_config_dir.join("out_console.toml"),
238+
r#"
239+
inputs = ["in_file"]
240+
type = "console"
241+
encoding.codec = "json"
242+
"#,
243+
);
244+
245+
overwrite_file(
246+
input_file.clone(),
247+
r#"42
248+
"#,
249+
);
250+
251+
cmd.arg("--quiet")
252+
.env("VECTOR_CONFIG_DIR", config_dir)
253+
.env("VECTOR_DATA_DIR", create_directory())
254+
.env("VECTOR_TEST_INPUT_FILE", input_file.clone());
255+
256+
// Run vector
257+
let vector = cmd.stdout(std::process::Stdio::piped()).spawn().unwrap();
258+
259+
// Give vector time to start.
260+
sleep(STARTUP_TIME);
261+
262+
// Signal shutdown
263+
kill(Pid::from_raw(vector.id() as i32), Signal::SIGTERM).unwrap();
264+
265+
// Wait for shutdown
266+
let output = vector.wait_with_output().unwrap();
267+
assert!(output.status.success(), "Vector didn't exit successfully.");
268+
269+
// Output
270+
let event: Value = serde_json::from_slice(output.stdout.as_slice()).unwrap();
271+
assert_eq!(event["message"], json!("42"));
272+
assert_eq!(event["test_host"], json!("runner"));
273+
}
274+
203275
#[test]
204276
fn configuration_path_recomputed() {
205277
// Directory with configuration files

0 commit comments

Comments
 (0)