Skip to content

fix(config): Fix preloading log_schema #17759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,14 @@ pub async fn load_configs(
paths = ?config_paths.iter().map(<&PathBuf>::from).collect::<Vec<_>>()
);

// config::init_log_schema should be called before initializing sources.
#[cfg(not(feature = "enterprise-tests"))]
config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?;

let mut config =
config::load_from_paths_with_provider_and_secrets(&config_paths, signal_handler)
.await
.map_err(handle_config_errors)?;
#[cfg(not(feature = "enterprise-tests"))]
config::init_log_schema(config.global.log_schema.clone(), true);

config::init_telemetry(config.global.telemetry.clone(), true);

Expand Down
12 changes: 11 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,19 @@ pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
pub use validation::warnings;
pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX};
pub use vector_core::config::{
init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId,
init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId,
};

/// Loads Log Schema from configurations and sets global schema.
/// Once this is done, configurations can be correctly loaded using
/// configured log schema defaults.
/// If deny is set, will panic if schema has already been set.
pub fn init_log_schema(config_paths: &[ConfigPath], deny_if_set: bool) -> Result<(), Vec<String>> {
let (builder, _) = load_builder_from_paths(config_paths)?;
vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set);
Ok(())
}

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum ConfigPath {
File(PathBuf, FormatHint),
Expand Down
15 changes: 1 addition & 14 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,11 @@ impl UnitTest {
}
}

/// Loads Log Schema from configurations and sets global schema.
/// Once this is done, configurations can be correctly loaded using
/// configured log schema defaults.
/// If deny is set, will panic if schema has already been set.
fn init_log_schema_from_paths(
config_paths: &[ConfigPath],
deny_if_set: bool,
) -> Result<(), Vec<String>> {
let (builder, _) = config::loading::load_builder_from_paths(config_paths)?;
vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set);
Ok(())
}

pub async fn build_unit_tests_main(
paths: &[ConfigPath],
signal_handler: &mut signal::SignalHandler,
) -> Result<Vec<UnitTest>, Vec<String>> {
init_log_schema_from_paths(paths, false)?;
config::init_log_schema(paths, false)?;
let (mut secrets_backends_loader, _) = loading::load_secret_backends_from_paths(paths)?;
let (config_builder, _) = if secrets_backends_loader.has_secrets_to_retrieve() {
let resolved_secrets = secrets_backends_loader
Expand Down
4 changes: 3 additions & 1 deletion src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
fmt.title(format!("Failed to load {:?}", &paths_list));
fmt.sub_error(errors);
};
config::init_log_schema(&paths, true)
.map_err(&mut report_error)
.ok()?;
let (builder, load_warnings) = config::load_builder_from_paths(&paths)
.map_err(&mut report_error)
.ok()?;
config::init_log_schema(builder.global.log_schema.clone(), true);

// Build
let (config, build_warnings) = builder
Expand Down
72 changes: 72 additions & 0 deletions tests/integration/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
fs::create_dir,
fs::read_dir,
io::Write,
net::SocketAddr,
Expand Down Expand Up @@ -200,6 +201,77 @@ fn log_schema() {
assert_eq!(event["test_msg"], json!("42"));
}

#[test]
fn log_schema_multiple_config_files() {
// Vector command
let mut cmd = Command::cargo_bin("vector").unwrap();

let config_dir = create_directory();

let sinks_config_dir = config_dir.join("sinks");
create_dir(sinks_config_dir.clone()).unwrap();

let sources_config_dir = config_dir.join("sources");
create_dir(sources_config_dir.clone()).unwrap();

let input_dir = create_directory();
let input_file = input_dir.join("input_file");

overwrite_file(
config_dir.join("vector.toml"),
r#"
data_dir = "${VECTOR_DATA_DIR}"
log_schema.host_key = "test_host"
"#,
);

overwrite_file(
sources_config_dir.join("in_file.toml"),
r#"
type = "file"
include = ["${VECTOR_TEST_INPUT_FILE}"]
"#,
);

overwrite_file(
sinks_config_dir.join("out_console.toml"),
r#"
inputs = ["in_file"]
type = "console"
encoding.codec = "json"
"#,
);

overwrite_file(
input_file.clone(),
r#"42
"#,
);

cmd.arg("--quiet")
.env("VECTOR_CONFIG_DIR", config_dir)
.env("VECTOR_DATA_DIR", create_directory())
.env("VECTOR_TEST_INPUT_FILE", input_file.clone());

// Run vector
let vector = cmd.stdout(std::process::Stdio::piped()).spawn().unwrap();

// Give vector time to start.
sleep(STARTUP_TIME);

// Signal shutdown
kill(Pid::from_raw(vector.id() as i32), Signal::SIGTERM).unwrap();

// Wait for shutdown
let output = vector.wait_with_output().unwrap();
assert!(output.status.success(), "Vector didn't exit successfully.");

// Output
let event: Value = serde_json::from_slice(output.stdout.as_slice()).unwrap();
assert_eq!(event["message"], json!("42"));
assert_eq!(event["test_host"], json!("runner"));
}

#[test]
fn configuration_path_recomputed() {
// Directory with configuration files
Expand Down