Skip to content

Commit 75f243a

Browse files
committed
Add DLQ support
Add support for moving failed messages to deadletter queue in Redis. The queue can be re-driven via an undocumented endpoint, `/api/v1/admin/redrive-dlq`.
1 parent 48ab14d commit 75f243a

File tree

9 files changed

+197
-11
lines changed

9 files changed

+197
-11
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,10 @@ To support graceful shutdown on the server, all running tasks are finished befor
389389
One of our main goals with open sourcing the Svix dispatcher is ease of use. The hosted Svix service, however, is quite complex due to our scale and the infrastructure it requires. This complexity is not useful for the vast majority of people and would make this project much harder to use and much more limited.
390390
This is why this code has been adjusted before being released, and some of the features, optimizations, and behaviors supported by the hosted dispatcher are not yet available in this repo. With that being said, other than some known incompatibilities, the internal Svix test suite passes. This means they are already mostly compatible, and we are working hard on bringing them to full feature parity.
391391

392+
# Re-driving Redis DLQ
393+
We have an undocumented endpoint for re-driving failed messages that are DLQ'ed. You can do this by calling `POST /api/v1/admin/redrive-dlq/`.
394+
395+
To monitor the DLQ depth, you should monitor the `svix.queue.depth_dlq` metric. Any non-zero values indicate that there is data in the DLQ.
392396

393397
# Development
394398

server/svix-server/src/cfg.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ const DEFAULTS: &str = include_str!("../config.default.toml");
6969

7070
pub type Configuration = Arc<ConfigurationInner>;
7171

72+
fn default_redis_pending_duration_secs() -> u64 {
73+
45
74+
}
75+
7276
#[derive(Clone, Debug, Deserialize, Validate)]
7377
#[validate(
7478
schema(function = "validate_config_complete"),
@@ -200,6 +204,9 @@ pub struct ConfigurationInner {
200204
#[serde(flatten)]
201205
pub proxy_config: Option<ProxyConfig>,
202206

207+
#[serde(default = "default_redis_pending_duration_secs")]
208+
pub redis_pending_duration_secs: u64,
209+
203210
#[serde(flatten)]
204211
pub internal: InternalConfig,
205212
}

server/svix-server/src/metrics/redis.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub struct RedisQueueMetrics {
4343
main_queue: Option<ObservableGauge<u64>>,
4444
pending_queue: Option<ObservableGauge<u64>>,
4545
delayed_queue: Option<ObservableGauge<u64>>,
46+
deadletter_queue: Option<ObservableGauge<u64>>,
4647
}
4748

4849
impl RedisQueueMetrics {
@@ -65,10 +66,17 @@ impl RedisQueueMetrics {
6566
.try_init(),
6667
);
6768

69+
let deadletter_queue = init_metric(
70+
meter
71+
.u64_observable_gauge("svix.queue.depth_dlq")
72+
.try_init(),
73+
);
74+
6875
Self {
6976
main_queue,
7077
pending_queue,
7178
delayed_queue,
79+
deadletter_queue,
7280
}
7381
}
7482
pub async fn record(
@@ -77,6 +85,7 @@ impl RedisQueueMetrics {
7785
main_queue: &RedisQueueType<'_>,
7886
pending_queue: &RedisQueueType<'_>,
7987
delayed_queue: &RedisQueueType<'_>,
88+
deadletter_queue: &RedisQueueType<'_>,
8089
) {
8190
main_queue
8291
.queue_depth(redis)
@@ -99,6 +108,13 @@ impl RedisQueueMetrics {
99108
.unwrap_or_else(|e| {
100109
tracing::warn!("Failed to record queue depth: {e}");
101110
});
111+
deadletter_queue
112+
.queue_depth(redis)
113+
.await
114+
.map(|d| self.record_deadletter_queue_depth(d))
115+
.unwrap_or_else(|e| {
116+
tracing::warn!("Failed to record queue depth: {e}");
117+
});
102118
}
103119

104120
fn record_main_queue_depth(&self, value: u64) {
@@ -116,4 +132,9 @@ impl RedisQueueMetrics {
116132
recorder.observe(value, &[]);
117133
}
118134
}
135+
fn record_deadletter_queue_depth(&self, value: u64) {
136+
if let Some(recorder) = &self.deadletter_queue {
137+
recorder.observe(value, &[]);
138+
}
139+
}
119140
}

server/svix-server/src/queue/redis.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
3131

32-
use omniqueue::backends::{RedisBackend, RedisConfig};
32+
use omniqueue::backends::{redis::DeadLetterQueueConfig, RedisBackend, RedisConfig};
3333
use redis::{AsyncCommands as _, RedisResult};
3434

3535
use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer};
@@ -51,6 +51,9 @@ const DELAYED: &str = "{queue}_svix_delayed";
5151
/// The key for the lock guarding the delayed queue background task.
5252
const DELAYED_LOCK: &str = "{queue}_svix_delayed_lock";
5353

54+
/// The key for the DLQ
55+
const DLQ: &str = "{queue}_svix_dlq";
56+
5457
// v2 KEY CONSTANTS
5558
const LEGACY_V2_MAIN: &str = "{queue}_svix_main";
5659
const LEGACY_V2_PROCESSING: &str = "{queue}_svix_processing";
@@ -82,11 +85,12 @@ pub async fn new_pair(
8285
) -> (TaskQueueProducer, TaskQueueConsumer) {
8386
new_pair_inner(
8487
cfg,
85-
Duration::from_secs(45),
88+
Duration::from_secs(cfg.redis_pending_duration_secs),
8689
prefix.unwrap_or_default(),
8790
MAIN,
8891
DELAYED,
8992
DELAYED_LOCK,
93+
DLQ,
9094
)
9195
.await
9296
}
@@ -125,10 +129,12 @@ async fn new_pair_inner(
125129
main_queue_name: &'static str,
126130
delayed_queue_name: &'static str,
127131
delayed_lock_name: &'static str,
132+
dlq_name: &'static str,
128133
) -> (TaskQueueProducer, TaskQueueConsumer) {
129134
let main_queue_name = format!("{queue_prefix}{main_queue_name}");
130135
let delayed_queue_name = format!("{queue_prefix}{delayed_queue_name}");
131136
let delayed_lock_name = format!("{queue_prefix}{delayed_lock_name}");
137+
let dlq_name = format!("{queue_prefix}{dlq_name}");
132138

133139
// This fn is only called from
134140
// - `queue::new_pair` if the queue type is redis and a DSN is set
@@ -205,6 +211,7 @@ async fn new_pair_inner(
205211
let pool = pool.clone();
206212
let main_queue_name = main_queue_name.clone();
207213
let delayed_queue_name = delayed_queue_name.clone();
214+
let deadletter_queue_name = dlq_name.clone();
208215

209216
async move {
210217
let mut interval = tokio::time::interval(Duration::from_secs(1));
@@ -214,12 +221,19 @@ async fn new_pair_inner(
214221
group: WORKERS_GROUP,
215222
};
216223
let delayed_queue = RedisQueueType::SortedSet(&delayed_queue_name);
224+
let deadletter_queue = RedisQueueType::List(&deadletter_queue_name);
217225
let metrics =
218226
crate::metrics::RedisQueueMetrics::new(&opentelemetry::global::meter("svix.com"));
219227
loop {
220228
interval.tick().await;
221229
metrics
222-
.record(&pool, &main_queue, &pending, &delayed_queue)
230+
.record(
231+
&pool,
232+
&main_queue,
233+
&pending,
234+
&delayed_queue,
235+
&deadletter_queue,
236+
)
223237
.await;
224238
}
225239
}
@@ -236,7 +250,10 @@ async fn new_pair_inner(
236250
consumer_name: WORKER_CONSUMER.to_owned(),
237251
payload_key: QUEUE_KV_KEY.to_owned(),
238252
ack_deadline_ms: pending_duration,
239-
dlq_config: None,
253+
dlq_config: Some(DeadLetterQueueConfig {
254+
queue_key: dlq_name.to_string(),
255+
max_receives: 3,
256+
}),
240257
sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()),
241258
};
242259

@@ -502,6 +519,7 @@ pub mod tests {
502519
"{test}_idle_period",
503520
"{test}_idle_period_delayed",
504521
"{test}_idle_period_delayed_lock",
522+
"{test}_dlq",
505523
)
506524
.await;
507525

@@ -572,6 +590,7 @@ pub mod tests {
572590
"{test}_ack",
573591
"{test}_ack_delayed",
574592
"{test}_ack_delayed_lock",
593+
"{test}_dlq",
575594
)
576595
.await;
577596

@@ -618,6 +637,7 @@ pub mod tests {
618637
"{test}_nack",
619638
"{test}_nack_delayed",
620639
"{test}_nack_delayed_lock",
640+
"{test}_dlq",
621641
)
622642
.await;
623643

@@ -661,6 +681,7 @@ pub mod tests {
661681
"{test}_delay",
662682
"{test}_delay_delayed",
663683
"{test}_delay_delayed_lock",
684+
"{test}_dlq",
664685
)
665686
.await;
666687

@@ -834,6 +855,7 @@ pub mod tests {
834855
v3_main,
835856
v2_delayed,
836857
v2_delayed_lock,
858+
"dlq-bruh",
837859
)
838860
.await;
839861

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use aide::{
2+
axum::{routing::post_with, ApiRouter},
3+
transform::TransformPathItem,
4+
};
5+
use axum::extract::State;
6+
use svix_server_derive::aide_annotate;
7+
8+
use crate::{core::permissions, error::Result, v1::utils::NoContent, AppState};
9+
10+
/// Redrive DLQ
11+
#[aide_annotate(op_id = "v1.admin.redrive-dlq")]
12+
pub async fn redrive_dlq(
13+
State(AppState { queue_tx, .. }): State<AppState>,
14+
_: permissions::Organization,
15+
) -> Result<NoContent> {
16+
if let Err(e) = queue_tx.redrive_dlq().await {
17+
tracing::warn!(error = ?e, "DLQ redrive failed");
18+
}
19+
Ok(NoContent)
20+
}
21+
22+
pub fn router() -> ApiRouter<AppState> {
23+
ApiRouter::new().api_route_with(
24+
"/admin/redrive-dlq",
25+
post_with(redrive_dlq, redrive_dlq_operation),
26+
move |op: TransformPathItem<'_>| op.tag("Admin".as_ref()).hidden(true),
27+
)
28+
}

server/svix-server/src/v1/endpoints/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-FileCopyrightText: © 2022 Svix Authors
22
// SPDX-License-Identifier: MIT
33

4+
pub mod admin;
45
pub mod application;
56
pub mod attempt;
67
pub mod auth;

server/svix-server/src/v1/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub fn router() -> ApiRouter<AppState> {
2121
.merge(endpoints::event_type::router())
2222
.merge(endpoints::message::router())
2323
.merge(endpoints::attempt::router())
24+
.merge(endpoints::admin::router())
2425
.layer(
2526
TraceLayer::new_for_http()
2627
.make_span_with(AxumOtelSpanCreator)

server/svix-server/tests/it/redis_queue.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,27 @@
66
77
use std::{str::FromStr, time::Duration};
88

9+
use http::StatusCode;
910
use redis::AsyncCommands as _;
11+
use svix_ksuid::KsuidLike;
1012
use svix_server::{
1113
cfg::Configuration,
12-
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
14+
core::types::{
15+
ApplicationId, BaseId, EndpointId, MessageAttemptTriggerType, MessageId, OrganizationId,
16+
},
1317
queue::{
1418
new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer,
1519
},
1620
redis::RedisManager,
21+
v1::endpoints::message::MessageOut,
1722
};
1823
use tokio::time::timeout;
1924

25+
use crate::utils::{
26+
common_calls::{create_test_app, create_test_endpoint, message_in},
27+
get_default_test_config, start_svix_server_with_cfg_and_org_id_and_prefix,
28+
};
29+
2030
// TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access
2131
async fn get_pool(cfg: &Configuration) -> RedisManager {
2232
RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await
@@ -148,3 +158,90 @@ async fn test_many_queue_consumers_delayed() {
148158
)
149159
.await;
150160
}
161+
162+
#[tokio::test]
163+
#[ignore]
164+
async fn test_redis_streams_dlq() {
165+
let mut cfg = get_default_test_config();
166+
cfg.worker_enabled = false;
167+
cfg.redis_pending_duration_secs = 1;
168+
169+
let cfg = std::sync::Arc::new(cfg);
170+
let prefix = svix_ksuid::Ksuid::new(None, None).to_string();
171+
172+
let pool = get_pool(&cfg).await;
173+
let mut conn = pool.get().await.unwrap();
174+
175+
let _: () = conn
176+
.del(format!("{prefix}{{queue}}_svix_v3_main"))
177+
.await
178+
.unwrap();
179+
180+
let _: () = conn
181+
.del(format!("{prefix}{{queue}}_svix_dlq"))
182+
.await
183+
.unwrap();
184+
185+
let (client, _jh) = start_svix_server_with_cfg_and_org_id_and_prefix(
186+
&cfg,
187+
OrganizationId::new(None, None),
188+
prefix.clone(),
189+
)
190+
.await;
191+
192+
let app_id = create_test_app(&client, "v1MessageCRTestApp")
193+
.await
194+
.unwrap()
195+
.id;
196+
197+
let _endp_id = create_test_endpoint(&client, &app_id, "http://localhost:2/bad/url/")
198+
.await
199+
.unwrap()
200+
.id;
201+
202+
let _message_1: MessageOut = client
203+
.post(
204+
&format!("api/v1/app/{app_id}/msg/"),
205+
message_in(&app_id, serde_json::json!({"test": "value"})).unwrap(),
206+
StatusCode::ACCEPTED,
207+
)
208+
.await
209+
.unwrap();
210+
211+
let (_p, mut c) = new_pair(&cfg, Some(&prefix)).await;
212+
213+
let wait_time = std::time::Duration::from_millis(1_500);
214+
for _ in 0..3 {
215+
let res = c.receive_all(wait_time).await.unwrap();
216+
assert!(!res.is_empty());
217+
for j in res {
218+
j.nack().await.unwrap();
219+
}
220+
}
221+
222+
let res = c.receive_all(wait_time).await.unwrap();
223+
assert!(res.is_empty());
224+
225+
tokio::time::sleep(wait_time).await;
226+
227+
// Redrive
228+
client
229+
.post_without_response(
230+
"/api/v1/admin/redrive-dlq",
231+
serde_json::Value::Null,
232+
StatusCode::NO_CONTENT,
233+
)
234+
.await
235+
.unwrap();
236+
237+
for _ in 0..3 {
238+
let res = c.receive_all(wait_time).await.unwrap();
239+
assert!(!res.is_empty());
240+
for j in res {
241+
j.nack().await.unwrap();
242+
}
243+
}
244+
245+
let res = c.receive_all(wait_time).await.unwrap();
246+
assert!(res.is_empty());
247+
}

0 commit comments

Comments
 (0)