Skip to content

Commit bc06ac8

Browse files
committed
Add test
1 parent 37b9d0c commit bc06ac8

File tree

4 files changed

+117
-8
lines changed

4 files changed

+117
-8
lines changed

server/svix-server/src/cfg.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ const DEFAULTS: &str = include_str!("../config.default.toml");
6969

7070
pub type Configuration = Arc<ConfigurationInner>;
7171

72+
fn default_redis_pending_duration_secs() -> u64 {
73+
45
74+
}
75+
7276
#[derive(Clone, Debug, Deserialize, Validate)]
7377
#[validate(
7478
schema(function = "validate_config_complete"),
@@ -200,6 +204,9 @@ pub struct ConfigurationInner {
200204
#[serde(flatten)]
201205
pub proxy_config: Option<ProxyConfig>,
202206

207+
#[serde(default = "default_redis_pending_duration_secs")]
208+
pub redis_pending_duration_secs: u64,
209+
203210
#[serde(flatten)]
204211
pub internal: InternalConfig,
205212
}

server/svix-server/src/queue/redis.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ pub async fn new_pair(
8484
) -> (TaskQueueProducer, TaskQueueConsumer) {
8585
new_pair_inner(
8686
cfg,
87-
Duration::from_secs(45),
87+
Duration::from_secs(cfg.redis_pending_duration_secs),
8888
prefix.unwrap_or_default(),
8989
MAIN,
9090
DELAYED,

server/svix-server/tests/it/redis_queue.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,27 @@
66
77
use std::{str::FromStr, time::Duration};
88

9+
use http::StatusCode;
910
use redis::AsyncCommands as _;
11+
use svix_ksuid::KsuidLike;
1012
use svix_server::{
1113
cfg::Configuration,
12-
core::types::{ApplicationId, EndpointId, MessageAttemptTriggerType, MessageId},
14+
core::types::{
15+
ApplicationId, BaseId, EndpointId, MessageAttemptTriggerType, MessageId, OrganizationId,
16+
},
1317
queue::{
1418
new_pair, MessageTask, QueueTask, TaskQueueConsumer, TaskQueueDelivery, TaskQueueProducer,
1519
},
1620
redis::RedisManager,
21+
v1::endpoints::message::MessageOut,
1722
};
1823
use tokio::time::timeout;
1924

25+
use crate::utils::{
26+
common_calls::{create_test_app, create_test_endpoint, message_in},
27+
get_default_test_config, start_svix_server_with_cfg_and_org_id_and_prefix,
28+
};
29+
2030
// TODO: Don't copy this from the Redis queue test directly, place the fn somewhere both can access
2131
async fn get_pool(cfg: &Configuration) -> RedisManager {
2232
RedisManager::from_queue_backend(&cfg.queue_backend(), cfg.redis_pool_max_size).await
@@ -148,3 +158,90 @@ async fn test_many_queue_consumers_delayed() {
148158
)
149159
.await;
150160
}
161+
162+
#[tokio::test]
163+
#[ignore]
164+
async fn test_redis_streams_dlq() {
165+
let mut cfg = get_default_test_config();
166+
cfg.worker_enabled = false;
167+
cfg.redis_pending_duration_secs = 1;
168+
169+
let cfg = std::sync::Arc::new(cfg);
170+
let prefix = svix_ksuid::Ksuid::new(None, None).to_string();
171+
172+
let pool = get_pool(&cfg).await;
173+
let mut conn = pool.get().await.unwrap();
174+
175+
let _: () = conn
176+
.del(format!("{prefix}{{queue}}_svix_v3_main"))
177+
.await
178+
.unwrap();
179+
180+
let _: () = conn
181+
.del(format!("{prefix}{{queue}}_svix_dlq"))
182+
.await
183+
.unwrap();
184+
185+
let (client, _jh) = start_svix_server_with_cfg_and_org_id_and_prefix(
186+
&cfg,
187+
OrganizationId::new(None, None),
188+
prefix.clone(),
189+
)
190+
.await;
191+
192+
let app_id = create_test_app(&client, "v1MessageCRTestApp")
193+
.await
194+
.unwrap()
195+
.id;
196+
197+
let _endp_id = create_test_endpoint(&client, &app_id, "http://localhost:2/bad/url/")
198+
.await
199+
.unwrap()
200+
.id;
201+
202+
let _message_1: MessageOut = client
203+
.post(
204+
&format!("api/v1/app/{app_id}/msg/"),
205+
message_in(&app_id, serde_json::json!({"test": "value"})).unwrap(),
206+
StatusCode::ACCEPTED,
207+
)
208+
.await
209+
.unwrap();
210+
211+
let (_p, mut c) = new_pair(&cfg, Some(&prefix)).await;
212+
213+
let wait_time = std::time::Duration::from_millis(1_500);
214+
for _ in 0..3 {
215+
let res = c.receive_all(wait_time).await.unwrap();
216+
assert!(!res.is_empty());
217+
for j in res {
218+
j.nack().await.unwrap();
219+
}
220+
}
221+
222+
let res = c.receive_all(wait_time).await.unwrap();
223+
assert!(res.is_empty());
224+
225+
tokio::time::sleep(wait_time).await;
226+
227+
// Redrive
228+
client
229+
.put_without_response(
230+
"/api/v1/admin/redrive-dlq",
231+
serde_json::Value::Null,
232+
StatusCode::NO_CONTENT,
233+
)
234+
.await
235+
.unwrap();
236+
237+
for _ in 0..3 {
238+
let res = c.receive_all(wait_time).await.unwrap();
239+
assert!(!res.is_empty());
240+
for j in res {
241+
j.nack().await.unwrap();
242+
}
243+
}
244+
245+
let res = c.receive_all(wait_time).await.unwrap();
246+
assert!(res.is_empty());
247+
}

server/svix-server/tests/it/utils/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,15 @@ pub async fn start_svix_server_with_cfg(
330330
pub async fn start_svix_server_with_cfg_and_org_id(
331331
cfg: &ConfigurationInner,
332332
org_id: OrganizationId,
333+
) -> (TestClient, tokio::task::JoinHandle<()>) {
334+
let prefix = svix_ksuid::Ksuid::new(None, None).to_string();
335+
start_svix_server_with_cfg_and_org_id_and_prefix(cfg, org_id, prefix).await
336+
}
337+
338+
pub async fn start_svix_server_with_cfg_and_org_id_and_prefix(
339+
cfg: &ConfigurationInner,
340+
org_id: OrganizationId,
341+
prefix: String,
333342
) -> (TestClient, tokio::task::JoinHandle<()>) {
334343
let (tracing_subscriber, _guard) = setup_tracing(cfg, /* for_test = */ true);
335344

@@ -340,12 +349,8 @@ pub async fn start_svix_server_with_cfg_and_org_id(
340349
let base_uri = format!("http://{}", listener.local_addr().unwrap());
341350

342351
let jh = tokio::spawn(
343-
svix_server::run_with_prefix(
344-
Some(svix_ksuid::Ksuid::new(None, None).to_string()),
345-
cfg,
346-
Some(listener),
347-
)
348-
.with_subscriber(tracing_subscriber),
352+
svix_server::run_with_prefix(Some(prefix), cfg, Some(listener))
353+
.with_subscriber(tracing_subscriber),
349354
);
350355

351356
(TestClient::new(base_uri, &token), jh)

0 commit comments

Comments
 (0)