Skip to content

Commit b4bf3c4

Browse files
authored
Exit when chitchat server fails (#5819)
* Exit when chitchat server fails * Fix test hanging * Better describe shutdown signal handler * Fix 2024 impl capture * Upgrade to chitchat main branch
1 parent 76b632b commit b4bf3c4

File tree

5 files changed

+72
-32
lines changed

5 files changed

+72
-32
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ binggan = { version = "0.14" }
9191
bytes = { version = "1", features = ["serde"] }
9292
bytesize = { version = "1.3", features = ["serde"] }
9393
bytestring = "1.4"
94-
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "13968f0" }
94+
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "bd54c81" }
9595
chrono = { version = "0.4", default-features = false, features = [
9696
"clock",
9797
"std",

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -380,17 +380,21 @@ impl Cluster {
380380
}
381381
}
382382

383-
/// Leaves the cluster.
384-
pub async fn shutdown(self) {
383+
#[cfg(any(test, feature = "testsuite"))]
384+
pub async fn leave(&self) {
385385
info!(
386386
cluster_id=%self.cluster_id,
387387
node_id=%self.self_chitchat_id.node_id,
388-
"Leaving the cluster."
388+
"leaving the cluster"
389389
);
390390
self.set_self_node_readiness(false).await;
391391
tokio::time::sleep(self.gossip_interval * 2).await;
392392
}
393393

394+
pub async fn initiate_shutdown(&self) -> anyhow::Result<()> {
395+
self.inner.read().await.chitchat_handle.initiate_shutdown()
396+
}
397+
394398
/// This exposes in chitchat some metrics about the CPU usage of cooperative pipelines.
395399
/// The metrics are exposed as follows:
396400
/// Key: pipeline_metrics:<index_uid>:<source_id>
@@ -433,6 +437,16 @@ impl Cluster {
433437
pub async fn chitchat(&self) -> Arc<Mutex<Chitchat>> {
434438
self.inner.read().await.chitchat_handle.chitchat()
435439
}
440+
441+
pub async fn chitchat_server_termination_watcher(
442+
&self,
443+
) -> impl Future<Output = anyhow::Result<()>> + use<> {
444+
self.inner
445+
.read()
446+
.await
447+
.chitchat_handle
448+
.termination_watcher()
449+
}
436450
}
437451

438452
impl ClusterChangeStreamFactory for Cluster {
@@ -834,7 +848,7 @@ mod tests {
834848
self_node_state.get(READINESS_KEY).unwrap(),
835849
READINESS_VALUE_NOT_READY
836850
);
837-
node.shutdown().await;
851+
node.leave().await;
838852
}
839853

840854
#[tokio::test]
@@ -871,19 +885,20 @@ mod tests {
871885
expected_members.sort();
872886
assert_eq!(members, expected_members);
873887

874-
node_2.shutdown().await;
888+
node_2.leave().await;
875889
node_1
876890
.wait_for_ready_members(|members| members.len() == 2, wait_secs)
877891
.await
878892
.unwrap();
879893

880-
node_3.shutdown().await;
894+
node_3.leave().await;
881895
node_1
882896
.wait_for_ready_members(|members| members.len() == 1, wait_secs)
883897
.await
884898
.unwrap();
885899

886-
node_1.shutdown().await;
900+
node_1.leave().await;
901+
drop(node_1);
887902

888903
let cluster_changes: Vec<ClusterChange> = node_1_change_stream.collect().await;
889904
assert_eq!(cluster_changes.len(), 6);

quickwit/quickwit-control-plane/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ async fn test_scheduler_scheduling_multiple_indexers() {
387387
assert_eq!(scheduler_state.num_schedule_indexing_plan, 1);
388388

389389
// Shutdown cluster and wait until the new scheduling.
390-
cluster_indexer_2.shutdown().await;
390+
cluster_indexer_2.leave().await;
391391

392392
cluster
393393
.wait_for_ready_members(

quickwit/quickwit-serve/src/lib.rs

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,41 @@ fn start_shard_positions_service(
395395
});
396396
}
397397

398+
/// Waits for the shutdown signal and notifies all other services when it
399+
/// occurs.
400+
///
401+
/// Usually called when receiving a SIGTERM signal, e.g. k8s trying to
402+
/// decomission a pod.
403+
async fn shutdown_signal_handler(
404+
shutdown_signal: BoxFutureInfaillible<()>,
405+
universe: Universe,
406+
ingester_opt: Option<Ingester>,
407+
grpc_shutdown_trigger_tx: oneshot::Sender<()>,
408+
rest_shutdown_trigger_tx: oneshot::Sender<()>,
409+
cluster: Cluster,
410+
) -> HashMap<String, ActorExitStatus> {
411+
shutdown_signal.await;
412+
// We must decommission the ingester first before terminating the indexing pipelines that
413+
// may consume from it. We also need to keep the gRPC server running while doing so.
414+
if let Some(ingester) = ingester_opt {
415+
if let Err(error) = wait_for_ingester_decommission(ingester).await {
416+
error!("failed to decommission ingester gracefully: {:?}", error);
417+
}
418+
}
419+
let actor_exit_statuses = universe.quit().await;
420+
421+
if grpc_shutdown_trigger_tx.send(()).is_err() {
422+
debug!("gRPC server shutdown signal receiver was dropped");
423+
}
424+
if rest_shutdown_trigger_tx.send(()).is_err() {
425+
debug!("REST server shutdown signal receiver was dropped");
426+
}
427+
if let Err(err) = cluster.initiate_shutdown().await {
428+
debug!("{err}");
429+
}
430+
actor_exit_statuses
431+
}
432+
398433
pub async fn serve_quickwit(
399434
node_config: NodeConfig,
400435
runtimes_config: RuntimesConfig,
@@ -757,7 +792,7 @@ pub async fn serve_quickwit(
757792
// Thus readiness task is started once gRPC and REST servers are started.
758793
spawn_named_task(
759794
node_readiness_reporting_task(
760-
cluster,
795+
cluster.clone(),
761796
metastore_through_control_plane,
762797
ingester_opt.clone(),
763798
grpc_readiness_signal_rx,
@@ -767,26 +802,14 @@ pub async fn serve_quickwit(
767802
"node_readiness_reporting",
768803
);
769804

770-
let shutdown_handle = tokio::spawn(async move {
771-
shutdown_signal.await;
772-
773-
// We must decommission the ingester first before terminating the indexing pipelines that
774-
// may consume from it. We also need to keep the gRPC server running while doing so.
775-
if let Some(ingester) = ingester_opt {
776-
if let Err(error) = wait_for_ingester_decommission(ingester).await {
777-
error!("failed to decommission ingester gracefully: {:?}", error);
778-
}
779-
}
780-
let actor_exit_statuses = universe.quit().await;
781-
782-
if grpc_shutdown_trigger_tx.send(()).is_err() {
783-
debug!("gRPC server shutdown signal receiver was dropped");
784-
}
785-
if rest_shutdown_trigger_tx.send(()).is_err() {
786-
debug!("REST server shutdown signal receiver was dropped");
787-
}
788-
actor_exit_statuses
789-
});
805+
let shutdown_handle = tokio::spawn(shutdown_signal_handler(
806+
shutdown_signal,
807+
universe,
808+
ingester_opt,
809+
grpc_shutdown_trigger_tx,
810+
rest_shutdown_trigger_tx,
811+
cluster.clone(),
812+
));
790813
let grpc_join_handle = async move {
791814
spawn_named_task(grpc_server, "grpc_server")
792815
.await
@@ -801,7 +824,9 @@ pub async fn serve_quickwit(
801824
.context("REST server failed")
802825
};
803826

804-
if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle) {
827+
let chitchat_server_handle = cluster.chitchat_server_termination_watcher().await;
828+
829+
if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle, chitchat_server_handle) {
805830
error!("server failed: {err:?}");
806831
}
807832

0 commit comments

Comments
 (0)