Skip to content

Commit 0987e27

Browse files
authored
refactor: graceful shutdown on meta node & unify election path (#17608)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 4d34bd3 commit 0987e27

File tree

13 files changed

+244
-278
lines changed

13 files changed

+244
-278
lines changed

src/cmd/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ pub fn compute(opts: ComputeNodeOpts) -> ! {
4444

4545
pub fn meta(opts: MetaNodeOpts) -> ! {
4646
init_risingwave_logger(LoggerSettings::from_opts(&opts));
47-
// TODO(shutdown): pass the shutdown token
48-
main_okk(|_| risingwave_meta_node::start(opts));
47+
main_okk(|shutdown| risingwave_meta_node::start(opts, shutdown));
4948
}
5049

5150
pub fn frontend(opts: FrontendOpts) -> ! {

src/cmd_all/src/standalone.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,10 @@ pub async fn standalone(
194194
is_in_memory = matches!(opts.backend, Some(MetaBackend::Mem));
195195
tracing::info!("starting meta-node thread with cli args: {:?}", opts);
196196

197+
let shutdown = shutdown.clone();
197198
let _meta_handle = tokio::spawn(async move {
198199
let dangerous_max_idle_secs = opts.dangerous_max_idle_secs;
199-
risingwave_meta_node::start(opts).await;
200+
risingwave_meta_node::start(opts, shutdown).await;
200201
tracing::warn!("meta is stopped, shutdown all nodes");
201202
if let Some(idle_exit_secs) = dangerous_max_idle_secs {
202203
eprintln!("{}",

src/meta/node/src/lib.rs

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use redact::Secret;
2626
use risingwave_common::config::OverrideConfig;
2727
use risingwave_common::util::meta_addr::MetaAddressStrategy;
2828
use risingwave_common::util::resource_util;
29+
use risingwave_common::util::tokio_util::sync::CancellationToken;
2930
use risingwave_common::{GIT_SHA, RW_VERSION};
3031
use risingwave_common_heap_profiling::HeapProfiler;
3132
use risingwave_meta::*;
@@ -204,7 +205,10 @@ use risingwave_common::config::{load_config, MetaBackend, RwConfig};
204205
use tracing::info;
205206

206207
/// Start meta node
207-
pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
208+
pub fn start(
209+
opts: MetaNodeOpts,
210+
shutdown: CancellationToken,
211+
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
208212
// WARNING: don't change the function signature. Making it `async fn` will cause
209213
// slow compile in release mode.
210214
Box::pin(async move {
@@ -324,7 +328,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
324328
max_timeout_ms / 1000
325329
} + MIN_TIMEOUT_INTERVAL_SEC;
326330

327-
let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
331+
rpc_serve(
328332
add_info,
329333
backend,
330334
max_heartbeat_interval,
@@ -428,42 +432,10 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
428432
},
429433
config.system.into_init_system_params(),
430434
Default::default(),
435+
shutdown,
431436
)
432437
.await
433438
.unwrap();
434-
435-
tracing::info!("Meta server listening at {}", listen_addr);
436-
437-
match leader_lost_handle {
438-
None => {
439-
tokio::select! {
440-
_ = tokio::signal::ctrl_c() => {
441-
tracing::info!("receive ctrl+c");
442-
shutdown_send.send(()).unwrap();
443-
join_handle.await.unwrap()
444-
}
445-
res = &mut join_handle => res.unwrap(),
446-
};
447-
}
448-
Some(mut handle) => {
449-
tokio::select! {
450-
_ = &mut handle => {
451-
tracing::info!("receive leader lost signal");
452-
// When we lose leadership, we will exit as soon as possible.
453-
}
454-
_ = tokio::signal::ctrl_c() => {
455-
tracing::info!("receive ctrl+c");
456-
shutdown_send.send(()).unwrap();
457-
join_handle.await.unwrap();
458-
handle.abort();
459-
}
460-
res = &mut join_handle => {
461-
res.unwrap();
462-
handle.abort();
463-
},
464-
};
465-
}
466-
};
467439
})
468440
}
469441

0 commit comments

Comments
 (0)