Skip to content

Commit 3f8b453

Browse files
committed
Add redis OTEL metrics
This adds support exporting OTEL metrics, and adds some basic monitoring of the various redis queues.
1 parent aaec9eb commit 3f8b453

File tree

7 files changed

+277
-57
lines changed

7 files changed

+277
-57
lines changed

server/Cargo.lock

Lines changed: 14 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/svix-server/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ once_cell = "1.18.0"
3333
figment = { version = "0.10", features = ["toml", "env", "test"] }
3434
tracing = "0.1.35"
3535
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
36-
tracing-opentelemetry = "0.23.0"
37-
opentelemetry = "0.22.0"
38-
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
39-
opentelemetry-http = "0.11.0"
40-
opentelemetry-otlp = { version = "0.15.0" }
36+
tracing-opentelemetry = "0.24.0"
37+
opentelemetry = { version = "0.23.0", features = ["metrics"] }
38+
opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] }
39+
opentelemetry-http = "0.12.0"
40+
opentelemetry-otlp = { version = "0.16.0", features = ["metrics"] }
4141
validator = { version = "0.16.0", features = ["derive"] }
4242
jwt-simple = "0.11.6"
4343
ed25519-compact = "2.1.1"
@@ -72,6 +72,7 @@ omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "75e5a9510ad33
7272
# Not a well-known author, and no longer gets updates => pinned.
7373
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
7474
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }
75+
hex = "0.4.3"
7576

7677
[target.'cfg(not(target_env = "msvc"))'.dependencies]
7778
tikv-jemallocator = { version = "0.5", optional = true }

server/svix-server/src/lib.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ use std::{
1313

1414
use aide::axum::ApiRouter;
1515
use cfg::ConfigurationInner;
16+
use once_cell::sync::Lazy;
1617
use opentelemetry_otlp::WithExportConfig;
17-
use opentelemetry_sdk::runtime::Tokio;
18+
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio};
1819
use queue::TaskQueueProducer;
1920
use redis::RedisManager;
2021
use sea_orm::DatabaseConnection;
2122
use sentry::integrations::tracing::EventFilter;
23+
use svix_ksuid::{KsuidLike, KsuidMs};
2224
use tower::layer::layer_fn;
2325
use tower_http::{
2426
cors::{AllowHeaders, Any, CorsLayer},
@@ -44,6 +46,7 @@ pub mod core;
4446
pub mod db;
4547
pub mod error;
4648
pub mod expired_message_cleaner;
49+
pub mod metrics;
4750
pub mod openapi;
4851
pub mod queue;
4952
pub mod redis;
@@ -54,6 +57,9 @@ const CRATE_NAME: &str = env!("CARGO_CRATE_NAME");
5457

5558
pub static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
5659

60+
pub static INSTANCE_ID: Lazy<String> =
61+
Lazy::new(|| hex::encode(KsuidMs::new(None, None).to_string()));
62+
5763
async fn graceful_shutdown_handler() {
5864
let ctrl_c = async {
5965
tokio::signal::ctrl_c()
@@ -83,6 +89,8 @@ async fn graceful_shutdown_handler() {
8389

8490
#[tracing::instrument(name = "app_start", level = "trace", skip_all)]
8591
pub async fn run(cfg: Configuration, listener: Option<TcpListener>) {
92+
let _metrics = setup_metrics(&cfg);
93+
8694
run_with_prefix(None, cfg, listener).await
8795
}
8896

@@ -325,6 +333,32 @@ pub fn setup_tracing(
325333
(registry, sentry_guard)
326334
}
327335

336+
pub fn setup_metrics(cfg: &ConfigurationInner) -> Option<SdkMeterProvider> {
337+
cfg.opentelemetry_address.as_ref().map(|addr| {
338+
let exporter = opentelemetry_otlp::new_exporter()
339+
.tonic()
340+
.with_endpoint(addr);
341+
342+
opentelemetry_otlp::new_pipeline()
343+
.metrics(Tokio)
344+
.with_delta_temporality()
345+
.with_exporter(exporter)
346+
.with_resource(opentelemetry_sdk::Resource::new(vec![
347+
opentelemetry::KeyValue::new(
348+
"service.name",
349+
cfg.opentelemetry_service_name.clone(),
350+
),
351+
opentelemetry::KeyValue::new("instance_id", INSTANCE_ID.to_owned()),
352+
opentelemetry::KeyValue::new(
353+
"service.version",
354+
option_env!("GITHUB_SHA").unwrap_or("unknown"),
355+
),
356+
]))
357+
.build()
358+
.unwrap()
359+
})
360+
}
361+
328362
pub fn setup_tracing_for_tests() {
329363
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
330364

server/svix-server/src/metrics/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
mod redis;
2+
3+
pub fn init_metric<T, E: std::fmt::Debug>(result: Result<T, E>) -> Option<T> {
4+
match result {
5+
Ok(t) => Some(t),
6+
Err(e) => {
7+
tracing::error!(error = ?e, "Failed to initialize metric");
8+
None
9+
}
10+
}
11+
}
12+
13+
pub use self::redis::{RedisQueueMetrics, RedisQueueType};
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use opentelemetry::metrics::{Meter, ObservableGauge};
2+
use redis::{streams::StreamPendingReply, AsyncCommands as _};
3+
4+
use super::init_metric;
5+
use crate::{
6+
error::{Error, Result},
7+
redis::RedisManager,
8+
};
9+
10+
pub enum RedisQueueType<'a> {
11+
Stream(&'a str),
12+
StreamPending { stream: &'a str, group: &'a str },
13+
List(&'a str),
14+
SortedSet(&'a str),
15+
}
16+
17+
impl<'a> RedisQueueType<'a> {
18+
pub async fn queue_depth(&self, redis: &RedisManager) -> Result<u64> {
19+
let mut conn = redis.get().await?;
20+
match self {
21+
RedisQueueType::Stream(q) => conn
22+
.xlen(q)
23+
.await
24+
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
25+
RedisQueueType::StreamPending { stream, group } => {
26+
let reply: StreamPendingReply = conn.xpending(stream, group).await?;
27+
Ok(reply.count() as _)
28+
}
29+
RedisQueueType::List(q) => conn
30+
.llen(q)
31+
.await
32+
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
33+
RedisQueueType::SortedSet(q) => conn
34+
.zcard(q)
35+
.await
36+
.map_err(|e| Error::queue(format!("Failed to query queue depth: {e}"))),
37+
}
38+
}
39+
}
40+
41+
#[derive(Clone)]
42+
pub struct RedisQueueMetrics {
43+
main_queue: Option<ObservableGauge<u64>>,
44+
pending_queue: Option<ObservableGauge<u64>>,
45+
delayed_queue: Option<ObservableGauge<u64>>,
46+
deadletter_queue: Option<ObservableGauge<u64>>,
47+
}
48+
49+
impl RedisQueueMetrics {
50+
pub fn new(meter: &Meter) -> Self {
51+
let main_queue = init_metric(
52+
meter
53+
.u64_observable_gauge("svix.queue.depth_main")
54+
.try_init(),
55+
);
56+
57+
let pending_queue = init_metric(
58+
meter
59+
.u64_observable_gauge("svix.queue.pending_msgs")
60+
.try_init(),
61+
);
62+
63+
let delayed_queue = init_metric(
64+
meter
65+
.u64_observable_gauge("svix.queue.depth_delayed")
66+
.try_init(),
67+
);
68+
69+
let deadletter_queue = init_metric(
70+
meter
71+
.u64_observable_gauge("svix.queue.depth_dlq")
72+
.try_init(),
73+
);
74+
75+
Self {
76+
main_queue,
77+
pending_queue,
78+
delayed_queue,
79+
deadletter_queue,
80+
}
81+
}
82+
pub async fn record(
83+
&self,
84+
redis: &RedisManager,
85+
main_queue: &RedisQueueType<'_>,
86+
pending_queue: &RedisQueueType<'_>,
87+
delayed_queue: &RedisQueueType<'_>,
88+
deadletter_queue: &RedisQueueType<'_>,
89+
) {
90+
main_queue
91+
.queue_depth(redis)
92+
.await
93+
.map(|d| self.record_main_queue_depth(d))
94+
.unwrap_or_else(|e| {
95+
tracing::warn!("Failed to record queue depth: {e}");
96+
});
97+
pending_queue
98+
.queue_depth(redis)
99+
.await
100+
.map(|d| self.record_pending_queue_depth(d))
101+
.unwrap_or_else(|e| {
102+
tracing::warn!("Failed to record queue depth: {e}");
103+
});
104+
delayed_queue
105+
.queue_depth(redis)
106+
.await
107+
.map(|d| self.record_delayed_queue_depth(d))
108+
.unwrap_or_else(|e| {
109+
tracing::warn!("Failed to record queue depth: {e}");
110+
});
111+
deadletter_queue
112+
.queue_depth(redis)
113+
.await
114+
.map(|d| self.record_deadletter_queue_depth(d))
115+
.unwrap_or_else(|e| {
116+
tracing::warn!("Failed to record queue depth: {e}");
117+
});
118+
}
119+
120+
fn record_main_queue_depth(&self, value: u64) {
121+
if let Some(recorder) = &self.main_queue {
122+
recorder.observe(value, &[]);
123+
}
124+
}
125+
fn record_pending_queue_depth(&self, value: u64) {
126+
if let Some(recorder) = &self.pending_queue {
127+
recorder.observe(value, &[]);
128+
}
129+
}
130+
fn record_delayed_queue_depth(&self, value: u64) {
131+
if let Some(recorder) = &self.delayed_queue {
132+
recorder.observe(value, &[]);
133+
}
134+
}
135+
fn record_deadletter_queue_depth(&self, value: u64) {
136+
if let Some(recorder) = &self.deadletter_queue {
137+
recorder.observe(value, &[]);
138+
}
139+
}
140+
}

server/svix-server/src/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
mod redis;
2+
mod worker;
3+
4+
pub use svix_server_core::metrics::*;
5+
6+
pub use self::{
7+
redis::{RedisQueueMetrics, RedisQueueType},
8+
worker::WorkerMetrics,
9+
};

0 commit comments

Comments
 (0)