Skip to content

Commit 52dffb2

Browse files
blizzardc0dersteedmicro
authored andcommitted
Factor out health status checks to own service
1 parent aadb2b9 commit 52dffb2

File tree

8 files changed

+140
-64
lines changed

8 files changed

+140
-64
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-config/examples/basic_cas.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@
154154
"worker_api": {
155155
"scheduler": "MAIN_SCHEDULER",
156156
},
157-
"admin": {}
157+
"admin": {},
158+
"health": {},
158159
}
159160
}],
160161
"global": {

nativelink-config/src/cas_server.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,18 @@ pub struct AdminConfig {
184184
pub path: String,
185185
}
186186

187+
#[derive(Deserialize, Debug, Default)]
188+
#[serde(deny_unknown_fields)]
189+
pub struct HealthConfig {
190+
/// Path to register the health status check. If path is "/status", and your
191+
/// domain is "example.com", you can reach the endpoint with:
192+
/// <http://example.com/status>.
193+
///
194+
/// Default: "/status"
195+
#[serde(default)]
196+
pub path: String,
197+
}
198+
187199
#[derive(Deserialize, Debug)]
188200
#[serde(deny_unknown_fields)]
189201
pub struct ServicesConfig {
@@ -228,6 +240,9 @@ pub struct ServicesConfig {
228240
/// This is the service for any administrative tasks.
229241
/// It provides a REST API endpoint for administrative purposes.
230242
pub admin: Option<AdminConfig>,
243+
244+
/// This is the service for health status check.
245+
pub health: Option<HealthConfig>,
231246
}
232247

233248
#[derive(Deserialize, Debug)]

nativelink-service/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ rust_library(
1414
"src/capabilities_server.rs",
1515
"src/cas_server.rs",
1616
"src/execution_server.rs",
17+
"src/health_server.rs",
1718
"src/lib.rs",
1819
"src/worker_api_server.rs",
1920
],
@@ -25,12 +26,15 @@ rust_library(
2526
"//nativelink-scheduler",
2627
"//nativelink-store",
2728
"//nativelink-util",
29+
"@crates//:async-lock",
2830
"@crates//:bytes",
2931
"@crates//:futures",
32+
"@crates//:hyper",
3033
"@crates//:log",
3134
"@crates//:parking_lot",
3235
"@crates//:prost",
3336
"@crates//:rand",
37+
"@crates//:serde_json5",
3438
"@crates//:tokio",
3539
"@crates//:tokio-stream",
3640
"@crates//:tonic",

nativelink-service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ nativelink-util = { path = "../nativelink-util" }
1111
nativelink-store = { path = "../nativelink-store" }
1212
nativelink-scheduler = { path = "../nativelink-scheduler" }
1313

14+
async-lock = "3.3.0"
1415
bytes = "1.6.0"
1516
futures = "0.3.30"
17+
hyper = { version = "0.14.28" }
18+
serde_json5 = "0.1.0"
1619
log = "0.4.21"
1720
parking_lot = "0.12.1"
1821
prost = "0.12.4"
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2024 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use async_lock::{Mutex as AsyncMutex, MutexGuard};
18+
use futures::StreamExt;
19+
use hyper::{Response, StatusCode};
20+
use nativelink_error::Error;
21+
use nativelink_util::health_utils::{
22+
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
23+
};
24+
25+
/// Content type header value for JSON.
26+
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";
27+
28+
#[derive(Clone)]
29+
pub struct HealthServer {
30+
health_registry_builder: Arc<AsyncMutex<HealthRegistryBuilder>>,
31+
}
32+
33+
impl HealthServer {
34+
pub async fn new(namespace: &str) -> Result<Self, Error> {
35+
let namespace = namespace.to_string();
36+
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
37+
namespace.into(),
38+
)));
39+
40+
Ok(HealthServer {
41+
health_registry_builder,
42+
})
43+
}
44+
45+
pub async fn get_health_registry(&self) -> Result<MutexGuard<HealthRegistryBuilder>, Error> {
46+
let health_registry_lock = self.health_registry_builder.lock().await;
47+
Ok(health_registry_lock)
48+
}
49+
50+
pub async fn check_health_status(&self) -> Response<String> {
51+
let health_registry_status = self.get_health_registry().await.unwrap().build();
52+
let health_status_descriptions: Vec<HealthStatusDescription> = health_registry_status
53+
.health_status_report()
54+
.collect()
55+
.await;
56+
57+
match serde_json5::to_string(&health_status_descriptions) {
58+
Ok(body) => {
59+
let contains_failed_report = health_status_descriptions
60+
.iter()
61+
.any(|description| matches!(description.status, HealthStatus::Failed { .. }));
62+
let status_code = if contains_failed_report {
63+
StatusCode::SERVICE_UNAVAILABLE
64+
} else {
65+
StatusCode::OK
66+
};
67+
68+
Response::builder()
69+
.status(status_code)
70+
.header(
71+
hyper::header::CONTENT_TYPE,
72+
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
73+
)
74+
.body(body)
75+
.unwrap()
76+
}
77+
78+
Err(e) => Response::builder()
79+
.status(StatusCode::INTERNAL_SERVER_ERROR)
80+
.header(
81+
hyper::header::CONTENT_TYPE,
82+
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
83+
)
84+
.body(format!("Internal Failure: {e:?}"))
85+
.unwrap(),
86+
}
87+
}
88+
}

nativelink-service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ pub mod bytestream_server;
1717
pub mod capabilities_server;
1818
pub mod cas_server;
1919
pub mod execution_server;
20+
pub mod health_server;
2021
pub mod worker_api_server;

src/bin/nativelink.rs

Lines changed: 25 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex;
2121
use axum::Router;
2222
use clap::Parser;
2323
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
24-
use futures::{FutureExt, StreamExt};
24+
use futures::FutureExt;
2525
use hyper::server::conn::Http;
2626
use hyper::{Response, StatusCode};
2727
use mimalloc::MiMalloc;
@@ -37,14 +37,12 @@ use nativelink_service::bytestream_server::ByteStreamServer;
3737
use nativelink_service::capabilities_server::CapabilitiesServer;
3838
use nativelink_service::cas_server::CasServer;
3939
use nativelink_service::execution_server::ExecutionServer;
40+
use nativelink_service::health_server::HealthServer;
4041
use nativelink_service::worker_api_server::WorkerApiServer;
4142
use nativelink_store::default_store_factory::store_factory;
4243
use nativelink_store::store_manager::StoreManager;
4344
use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
4445
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
45-
use nativelink_util::health_utils::{
46-
HealthRegistryBuilder, HealthStatus, HealthStatusDescription, HealthStatusReporter,
47-
};
4846
use nativelink_util::metrics_utils::{
4947
set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent,
5048
Registry,
@@ -79,12 +77,12 @@ const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics";
7977
/// Note: This must be kept in sync with the documentation in `AdminConfig::path`.
8078
const DEFAULT_ADMIN_API_PATH: &str = "/admin";
8179

80+
// Note: This must be kept in sync with the documentation in `HealthConfig::path`.
81+
const DEFAULT_HEALTH_STATUS_CHECK_PATH: &str = "/status";
82+
8283
/// Name of environment variable to disable metrics.
8384
const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS";
8485

85-
/// Content type header value for JSON.
86-
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";
87-
8886
/// Backend for bazel remote execution / cache API.
8987
#[derive(Parser, Debug)]
9088
#[clap(
@@ -104,13 +102,11 @@ async fn inner_main(
104102
server_start_timestamp: u64,
105103
) -> Result<(), Box<dyn std::error::Error>> {
106104
let mut root_metrics_registry = <Registry>::with_prefix("nativelink");
107-
let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new(
108-
"nativelink".into(),
109-
)));
105+
let health_server = HealthServer::new("nativelink").await?;
110106

111107
let store_manager = Arc::new(StoreManager::new());
112108
{
113-
let mut health_registry_lock = health_registry_builder.lock().await;
109+
let mut health_registry_lock = health_server.get_health_registry().await?;
114110
let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");
115111

116112
for (name, store_cfg) in cfg.stores {
@@ -397,68 +393,34 @@ async fn inner_main(
397393
);
398394

399395
let root_metrics_registry = root_metrics_registry.clone();
400-
let health_registry_status = health_registry_builder.lock().await.build();
401396

402397
let mut svc = Router::new()
403398
// This is the default service that executes if no other endpoint matches.
404-
.fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}")))
405-
.route_service(
406-
"/status",
407-
axum::routing::get(move || async move {
408-
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
409-
let mut response = Response::new(format!("Error: {e:?}"));
410-
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
411-
response
412-
}
399+
.fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}")));
413400

401+
if let Some(health_cfg) = services.health {
402+
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {
403+
let mut response = Response::new(format!("Error: {e:?}"));
404+
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
405+
response
406+
}
407+
let path = if health_cfg.path.is_empty() {
408+
DEFAULT_HEALTH_STATUS_CHECK_PATH
409+
} else {
410+
&health_cfg.path
411+
};
412+
let health_server_cloned = health_server.clone();
413+
svc = svc.route_service(
414+
path,
415+
axum::routing::get(move || async move {
414416
spawn_blocking(move || {
415-
futures::executor::block_on(async {
416-
let health_status_descriptions: Vec<HealthStatusDescription> =
417-
health_registry_status
418-
.health_status_report()
419-
.collect()
420-
.await;
421-
422-
match serde_json5::to_string(&health_status_descriptions) {
423-
Ok(body) => {
424-
let contains_failed_report =
425-
health_status_descriptions.iter().any(|description| {
426-
matches!(
427-
description.status,
428-
HealthStatus::Failed { .. }
429-
)
430-
});
431-
let status_code = if contains_failed_report {
432-
StatusCode::SERVICE_UNAVAILABLE
433-
} else {
434-
StatusCode::OK
435-
};
436-
Response::builder()
437-
.status(status_code)
438-
.header(
439-
hyper::header::CONTENT_TYPE,
440-
hyper::header::HeaderValue::from_static(
441-
JSON_CONTENT_TYPE,
442-
),
443-
)
444-
.body(body)
445-
.unwrap()
446-
}
447-
Err(e) => Response::builder()
448-
.status(StatusCode::INTERNAL_SERVER_ERROR)
449-
.header(
450-
hyper::header::CONTENT_TYPE,
451-
hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE),
452-
)
453-
.body(format!("Internal Failure: {e:?}"))
454-
.unwrap(),
455-
}
456-
})
417+
futures::executor::block_on(health_server_cloned.check_health_status())
457418
})
458419
.await
459420
.unwrap_or_else(error_to_response)
460421
}),
461422
);
423+
}
462424

463425
if let Some(prometheus_cfg) = services.experimental_prometheus {
464426
fn error_to_response<E: std::error::Error>(e: E) -> Response<String> {

0 commit comments

Comments
 (0)