Skip to content

Exit when chitchat server fails #5819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ binggan = { version = "0.14" }
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3", features = ["serde"] }
bytestring = "1.4"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "13968f0" }
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "bd54c81" }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
Expand Down
29 changes: 22 additions & 7 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,21 @@ impl Cluster {
}
}

/// Leaves the cluster.
pub async fn shutdown(self) {
#[cfg(any(test, feature = "testsuite"))]
pub async fn leave(&self) {
info!(
cluster_id=%self.cluster_id,
node_id=%self.self_chitchat_id.node_id,
"Leaving the cluster."
"leaving the cluster"
);
self.set_self_node_readiness(false).await;
tokio::time::sleep(self.gossip_interval * 2).await;
}

pub async fn initiate_shutdown(&self) -> anyhow::Result<()> {
self.inner.read().await.chitchat_handle.initiate_shutdown()
}

/// This exposes in chitchat some metrics about the CPU usage of cooperative pipelines.
/// The metrics are exposed as follows:
/// Key: pipeline_metrics:<index_uid>:<source_id>
Expand Down Expand Up @@ -433,6 +437,16 @@ impl Cluster {
pub async fn chitchat(&self) -> Arc<Mutex<Chitchat>> {
self.inner.read().await.chitchat_handle.chitchat()
}

pub async fn chitchat_server_termination_watcher(
&self,
) -> impl Future<Output = anyhow::Result<()>> + use<> {
self.inner
.read()
.await
.chitchat_handle
.termination_watcher()
}
}

impl ClusterChangeStreamFactory for Cluster {
Expand Down Expand Up @@ -834,7 +848,7 @@ mod tests {
self_node_state.get(READINESS_KEY).unwrap(),
READINESS_VALUE_NOT_READY
);
node.shutdown().await;
node.leave().await;
}

#[tokio::test]
Expand Down Expand Up @@ -871,19 +885,20 @@ mod tests {
expected_members.sort();
assert_eq!(members, expected_members);

node_2.shutdown().await;
node_2.leave().await;
node_1
.wait_for_ready_members(|members| members.len() == 2, wait_secs)
.await
.unwrap();

node_3.shutdown().await;
node_3.leave().await;
node_1
.wait_for_ready_members(|members| members.len() == 1, wait_secs)
.await
.unwrap();

node_1.shutdown().await;
node_1.leave().await;
drop(node_1);

let cluster_changes: Vec<ClusterChange> = node_1_change_stream.collect().await;
assert_eq!(cluster_changes.len(), 6);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ async fn test_scheduler_scheduling_multiple_indexers() {
assert_eq!(scheduler_state.num_schedule_indexing_plan, 1);

// Shutdown cluster and wait until the new scheduling.
cluster_indexer_2.shutdown().await;
cluster_indexer_2.leave().await;

cluster
.wait_for_ready_members(
Expand Down
69 changes: 47 additions & 22 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,41 @@ fn start_shard_positions_service(
});
}

/// Waits for the shutdown signal and notifies all other services when it
/// occurs.
///
/// Usually called when receiving a SIGTERM signal, e.g. k8s trying to
/// decomission a pod.
async fn shutdown_signal_handler(
shutdown_signal: BoxFutureInfaillible<()>,
universe: Universe,
ingester_opt: Option<Ingester>,
grpc_shutdown_trigger_tx: oneshot::Sender<()>,
rest_shutdown_trigger_tx: oneshot::Sender<()>,
cluster: Cluster,
) -> HashMap<String, ActorExitStatus> {
shutdown_signal.await;
// We must decommission the ingester first before terminating the indexing pipelines that
// may consume from it. We also need to keep the gRPC server running while doing so.
if let Some(ingester) = ingester_opt {
if let Err(error) = wait_for_ingester_decommission(ingester).await {
error!("failed to decommission ingester gracefully: {:?}", error);
}
}
let actor_exit_statuses = universe.quit().await;

if grpc_shutdown_trigger_tx.send(()).is_err() {
debug!("gRPC server shutdown signal receiver was dropped");
}
if rest_shutdown_trigger_tx.send(()).is_err() {
debug!("REST server shutdown signal receiver was dropped");
}
if let Err(err) = cluster.initiate_shutdown().await {
debug!("{err}");
}
actor_exit_statuses
}

pub async fn serve_quickwit(
node_config: NodeConfig,
runtimes_config: RuntimesConfig,
Expand Down Expand Up @@ -757,7 +792,7 @@ pub async fn serve_quickwit(
// Thus readiness task is started once gRPC and REST servers are started.
spawn_named_task(
node_readiness_reporting_task(
cluster,
cluster.clone(),
metastore_through_control_plane,
ingester_opt.clone(),
grpc_readiness_signal_rx,
Expand All @@ -767,26 +802,14 @@ pub async fn serve_quickwit(
"node_readiness_reporting",
);

let shutdown_handle = tokio::spawn(async move {
shutdown_signal.await;

// We must decommission the ingester first before terminating the indexing pipelines that
// may consume from it. We also need to keep the gRPC server running while doing so.
if let Some(ingester) = ingester_opt {
if let Err(error) = wait_for_ingester_decommission(ingester).await {
error!("failed to decommission ingester gracefully: {:?}", error);
}
}
let actor_exit_statuses = universe.quit().await;

if grpc_shutdown_trigger_tx.send(()).is_err() {
debug!("gRPC server shutdown signal receiver was dropped");
}
if rest_shutdown_trigger_tx.send(()).is_err() {
debug!("REST server shutdown signal receiver was dropped");
}
actor_exit_statuses
});
let shutdown_handle = tokio::spawn(shutdown_signal_handler(
shutdown_signal,
universe,
ingester_opt,
grpc_shutdown_trigger_tx,
rest_shutdown_trigger_tx,
cluster.clone(),
));
let grpc_join_handle = async move {
spawn_named_task(grpc_server, "grpc_server")
.await
Expand All @@ -801,7 +824,9 @@ pub async fn serve_quickwit(
.context("REST server failed")
};

if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle) {
let chitchat_server_handle = cluster.chitchat_server_termination_watcher().await;

if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle, chitchat_server_handle) {
error!("server failed: {err:?}");
}

Expand Down