Skip to content

Commit c4b136b

Browse files
committed
Factor out health status checks
1 parent 7997f03 commit c4b136b

File tree

6 files changed

+103
-51
lines changed

6 files changed

+103
-51
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-service/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ rust_library(
1414
"src/capabilities_server.rs",
1515
"src/cas_server.rs",
1616
"src/execution_server.rs",
17+
"src/health_server.rs",
1718
"src/lib.rs",
1819
"src/worker_api_server.rs",
1920
],
@@ -25,12 +26,15 @@ rust_library(
2526
"//nativelink-scheduler",
2627
"//nativelink-store",
2728
"//nativelink-util",
29+
"@crates//:async-lock",
2830
"@crates//:bytes",
2931
"@crates//:futures",
32+
"@crates//:hyper",
3033
"@crates//:log",
3134
"@crates//:parking_lot",
3235
"@crates//:prost",
3336
"@crates//:rand",
37+
"@crates//:serde_json5",
3438
"@crates//:tokio",
3539
"@crates//:tokio-stream",
3640
"@crates//:tonic",

nativelink-service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ nativelink-util = { path = "../nativelink-util" }
1111
nativelink-store = { path = "../nativelink-store" }
1212
nativelink-scheduler = { path = "../nativelink-scheduler" }
1313

14+
async-lock = "3.3.0"
1415
bytes = "1.6.0"
1516
futures = "0.3.30"
17+
hyper = { version = "0.14.28" }
18+
serde_json5 = "0.1.0"
1619
log = "0.4.21"
1720
parking_lot = "0.12.1"
1821
prost = "0.12.3"
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2023 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use async_lock::{Mutex as AsyncMutex, MutexGuard};
18+
use futures::StreamExt;
19+
use hyper::{Response, StatusCode};
20+
use nativelink_error::Error;
21+
use nativelink_util::health_utils::{
22+
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
23+
};
24+
25+
#[derive(Clone)]
26+
pub struct HealthServer {
27+
health_registry_builder: Arc<AsyncMutex<HealthRegistryBuilder>>,
28+
}
29+
30+
impl HealthServer {
31+
pub async fn new(namespace: &str) -> Result<Self, Error> {
32+
let namespace = namespace.to_string();
33+
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
34+
namespace.into(),
35+
)));
36+
37+
Ok(HealthServer {
38+
health_registry_builder,
39+
})
40+
}
41+
42+
pub async fn get_health_registry(&self) -> Result<MutexGuard<HealthRegistryBuilder>, Error> {
43+
let health_registry_lock = self.health_registry_builder.lock().await;
44+
Ok(health_registry_lock)
45+
}
46+
47+
pub async fn check_health_status(&self, json_content_type: &'static str) -> Response<String> {
48+
let health_registry_status = self.get_health_registry().await.unwrap().build();
49+
let health_status_descriptions: Vec<HealthStatusDescription> = health_registry_status
50+
.health_status_report()
51+
.collect()
52+
.await;
53+
54+
match serde_json5::to_string(&health_status_descriptions) {
55+
Ok(body) => {
56+
let contains_failed_report = health_status_descriptions
57+
.iter()
58+
.any(|description| matches!(description.status, HealthStatus::Failed { .. }));
59+
let status_code = if contains_failed_report {
60+
StatusCode::SERVICE_UNAVAILABLE
61+
} else {
62+
StatusCode::OK
63+
};
64+
65+
Response::builder()
66+
.status(status_code)
67+
.header(
68+
hyper::header::CONTENT_TYPE,
69+
hyper::header::HeaderValue::from_static(json_content_type),
70+
)
71+
.body(body)
72+
.unwrap()
73+
}
74+
75+
Err(e) => Response::builder()
76+
.status(StatusCode::INTERNAL_SERVER_ERROR)
77+
.header(
78+
hyper::header::CONTENT_TYPE,
79+
hyper::header::HeaderValue::from_static(json_content_type),
80+
)
81+
.body(format!("Internal Failure: {e:?}"))
82+
.unwrap(),
83+
}
84+
}
85+
}

nativelink-service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ pub mod bytestream_server;
1717
pub mod capabilities_server;
1818
pub mod cas_server;
1919
pub mod execution_server;
20+
pub mod health_server;
2021
pub mod worker_api_server;

src/bin/nativelink.rs

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex;
2121
use axum::Router;
2222
use clap::Parser;
2323
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
24-
use futures::{FutureExt, StreamExt};
24+
use futures::FutureExt;
2525
use hyper::server::conn::Http;
2626
use hyper::{Response, StatusCode};
2727
use mimalloc::MiMalloc;
@@ -37,14 +37,12 @@ use nativelink_service::bytestream_server::ByteStreamServer;
3737
use nativelink_service::capabilities_server::CapabilitiesServer;
3838
use nativelink_service::cas_server::CasServer;
3939
use nativelink_service::execution_server::ExecutionServer;
40+
use nativelink_service::health_server::HealthServer;
4041
use nativelink_service::worker_api_server::WorkerApiServer;
4142
use nativelink_store::default_store_factory::store_factory;
4243
use nativelink_store::store_manager::StoreManager;
4344
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
4445
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
45-
use nativelink_util::health_utils::{
46-
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
47-
};
4846
use nativelink_util::metrics_utils::{
4947
set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent,
5048
Registry,
@@ -104,13 +102,11 @@ async fn inner_main(
104102
server_start_timestamp: u64,
105103
) -> Result<(), Box<dyn std::error::Error>> {
106104
let mut root_metrics_registry = <Registry>::with_prefix("nativelink");
107-
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
108-
"nativelink".into(),
109-
)));
105+
let health_server = HealthServer::new("nativelink").await?;
110106

111107
let store_manager = Arc::new(StoreManager::new());
112108
{
113-
let mut health_registry_lock = health_registry_builder.lock().await;
109+
let mut health_registry_lock = health_server.get_health_registry().await?;
114110
let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");
115111

116112
for (name, store_cfg) in cfg.stores {
@@ -397,7 +393,7 @@ async fn inner_main(
397393
);
398394

399395
let root_metrics_registry = root_metrics_registry.clone();
400-
let health_registry_status = health_registry_builder.lock().await.build();
396+
let health_server_cloned = health_server.clone();
401397

402398
let mut svc = Router::new()
403399
// This is the default service that executes if no other endpoint matches.
@@ -412,48 +408,9 @@ async fn inner_main(
412408
}
413409

414410
spawn_blocking(move || {
415-
futures::executor::block_on(async {
416-
let health_status_descriptions: Vec<HealthStatusDescription> =
417-
health_registry_status
418-
.health_status_report()
419-
.collect()
420-
.await;
421-
422-
match serde_json5::to_string(&health_status_descriptions) {
423-
Ok(body) => {
424-
let contains_failed_report =
425-
health_status_descriptions.iter().any(|description| {
426-
matches!(
427-
description.status,
428-
HealthStatus::Failed { .. }
429-
)
430-
});
431-
let status_code = if contains_failed_report {
432-
StatusCode::SERVICE_UNAVAILABLE
433-
} else {
434-
StatusCode::OK
435-
};
436-
Response::builder()
437-
.status(status_code)
438-
.header(
439-
hyper::header::CONTENT_TYPE,
440-
hyper::header::HeaderValue::from_static(
441-
JSON_CONTENT_TYPE,
442-
),
443-
)
444-
.body(body)
445-
.unwrap()
446-
}
447-
Err(e) => Response::builder()
448-
.status(StatusCode::INTERNAL_SERVER_ERROR)
449-
.header(
450-
hyper::header::CONTENT_TYPE,
451-
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
452-
)
453-
.body(format!("Internal Failure: {e:?}"))
454-
.unwrap(),
455-
}
456-
})
411+
futures::executor::block_on(
412+
health_server_cloned.check_health_status(JSON_CONTENT_TYPE),
413+
)
457414
})
458415
.await
459416
.unwrap_or_else(error_to_response)

0 commit comments

Comments
 (0)