Skip to content

console configuration should be optional #737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Console endpoint /config prints Router Config instead of returning console settings
- v4 config is optional, user can specify v4 and/or v5 config
- websocket feature is enabled by default
- console configuration is optional

### Deprecated
- "websockets" feature is removed in favour of "websocket"
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct Config {
pub v5: Option<HashMap<String, ServerSettings>>,
pub ws: Option<HashMap<String, ServerSettings>>,
pub cluster: Option<ClusterSettings>,
pub console: ConsoleSettings,
pub console: Option<ConsoleSettings>,
pub bridge: Option<BridgeConfig>,
pub prometheus: Option<PrometheusSetting>,
pub metrics: Option<HashMap<MetricType, MetricSettings>>,
Expand Down
5 changes: 4 additions & 1 deletion rumqttd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ fn main() {
};

let mut configs: rumqttd::Config = config_builder.build().unwrap().try_deserialize().unwrap();
configs.console.set_filter_reload_handle(reload_handle);

if let Some(console_config) = configs.console.as_mut() {
console_config.set_filter_reload_handle(reload_handle)
}

validate_config(&configs);

Expand Down
37 changes: 29 additions & 8 deletions rumqttd/src/server/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ impl Broker {
));
}

// we don't know which servers (v4/v5/ws) user will spawn
// so we collect handles for all of the spawned servers
let mut server_thread_handles = Vec::new();

if let Some(metrics_config) = self.config.metrics.clone() {
let timer_thread = thread::Builder::new().name("timer".to_owned());
let router_tx = self.router_tx.clone();
Expand Down Expand Up @@ -200,7 +204,7 @@ impl Broker {
for (_, config) in v4_config.clone() {
let server_thread = thread::Builder::new().name(config.name.clone());
let mut server = Server::new(config, self.router_tx.clone(), V4);
server_thread.spawn(move || {
let handle = server_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

Expand All @@ -210,14 +214,15 @@ impl Broker {
}
});
})?;
server_thread_handles.push(handle)
}
}

if let Some(v5_config) = &self.config.v5 {
for (_, config) in v5_config.clone() {
let server_thread = thread::Builder::new().name(config.name.clone());
let mut server = Server::new(config, self.router_tx.clone(), V5);
server_thread.spawn(move || {
let handle = server_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

Expand All @@ -227,6 +232,7 @@ impl Broker {
}
});
})?;
server_thread_handles.push(handle)
}
}

Expand All @@ -241,7 +247,7 @@ impl Broker {
let server_thread = thread::Builder::new().name(config.name.clone());
//TODO: Add support for V5 procotol with websockets. Registered in config or on ServerSettings
let mut server = Server::new(config, self.router_tx.clone(), V4);
server_thread.spawn(move || {
let handle = server_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

Expand All @@ -251,6 +257,7 @@ impl Broker {
}
});
})?;
server_thread_handles.push(handle)
}
}

Expand Down Expand Up @@ -297,12 +304,26 @@ impl Broker {
})?;
}

let console_link = ConsoleLink::new(self.config.console.clone(), self.router_tx.clone());
if let Some(console) = self.config.console.clone() {
let console_link = ConsoleLink::new(console, self.router_tx.clone());

let console_link = Arc::new(console_link);
let console_thread = thread::Builder::new().name("Console".to_string());
console_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();
runtime.block_on(console::start(console_link));
})?;
}

let console_link = Arc::new(console_link);
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();
runtime.block_on(console::start(console_link));
// in ideal case, where server doesn't crash, join() will never resolve
// we still try to join threads so that we don't return from function
// unless everything crashes.
server_thread_handles.into_iter().for_each(|handle| {
// join() might panic in case the thread panics
// we just ignore it
let _ = handle.join();
});

Ok(())
}
Expand Down