Skip to content

Commit 5b9d232

Browse files
committed
Add redis sentinel support
This adds support for using Redis Sentinel for queuing and caching. Sentinel potentially requires a lot of additional configuration params, so support for that that has been added here, although the `redis_dsn` field has been reused and should point to a sentinel server if using the `redissentinel` queue/cache types.
1 parent 7cb02ce commit 5b9d232

File tree

11 files changed

+439
-66
lines changed

11 files changed

+439
-66
lines changed

server/Cargo.lock

Lines changed: 7 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/run-tests.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,16 @@ echo "*********** RUN 6 ***********"
8080
${TEST_COMMAND} -- --ignored rabbitmq
8181
fi
8282
)
83+
84+
echo "*********** RUN 7 ***********"
85+
(
86+
export SVIX_QUEUE_TYPE="redissentinel"
87+
export SVIX_CACHE_TYPE="redissentinel"
88+
export SVIX_REDIS_DSN="redis://localhost:26379"
89+
export SVIX_SENTINEL_SERVICE_NAME="master0"
90+
91+
${TEST_COMMAND} "$@"
92+
if [[ -z "$@" ]]; then
93+
${TEST_COMMAND} -- --ignored redis
94+
fi
95+
)

server/svix-server/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ ed25519-compact = "2.1.1"
4444
chrono = { version="0.4.26", features = ["serde"] }
4545
reqwest = { version = "0.11.27", features = ["json", "rustls-tls", "hickory-resolver"], default-features = false }
4646
bb8 = "0.8"
47-
bb8-redis = "0.15.0"
48-
redis = { version = "0.25.4", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager"] }
47+
bb8-redis = "0.16.0"
48+
redis = { version = "0.26", features = ["tokio-comp", "tokio-native-tls-comp", "streams", "cluster-async", "tcp_nodelay", "connection-manager", "sentinel"] }
4949
thiserror = "1.0.30"
5050
bytes = "1.1.0"
5151
blake2 = "0.10.4"
@@ -68,7 +68,7 @@ urlencoding = "2.1.2"
6868
form_urlencoded = "1.1.0"
6969
lapin = "2.1.1"
7070
sentry = { version = "0.32.2", features = ["tracing"] }
71-
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "5ae22000e2ea214ba707cac81657f098e5785a76", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster"] }
71+
omniqueue = { git = "https://github.com/jaymell/omniqueue-rs", rev = "55f3bccb4298c5de75da1358675f6bfadb016222", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster", "redis_sentinel"] }
7272
# Not a well-known author, and no longer gets updates => pinned.
7373
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
7474
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }

server/svix-server/src/cfg.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,16 @@ pub struct ConfigurationInner {
131131
pub db_pool_max_size: u16,
132132

133133
/// The DSN for redis (can be left empty if not using redis)
134+
/// Note that if using Redis Sentinel, this will be the the DSN
135+
/// for a Sentinel instance.
134136
pub redis_dsn: Option<String>,
135137
/// The maximum number of connections for the Redis pool
136138
#[validate(range(min = 10))]
137139
pub redis_pool_max_size: u16,
138140

141+
#[serde(flatten, default)]
142+
pub redis_sentinel_cfg: Option<SentinelConfig>,
143+
139144
/// What kind of message queue to use. Supported: memory, redis (must have redis_dsn or
140145
/// queue_dsn configured).
141146
pub queue_type: QueueType,
@@ -255,6 +260,27 @@ fn validate_config_complete(config: &ConfigurationInner) -> Result<(), Validatio
255260
});
256261
}
257262
}
263+
CacheType::RedisSentinel => {
264+
if config.cache_dsn().is_none() {
265+
return Err(ValidationError {
266+
code: Cow::from("missing field"),
267+
message: Some(Cow::from(
268+
"The redis_dsn or cache_dsn field must be set if the cache_type is `redissentinel`"
269+
)),
270+
params: HashMap::new(),
271+
});
272+
}
273+
274+
if config.redis_sentinel_cfg.is_none() {
275+
return Err(ValidationError {
276+
code: Cow::from("missing field"),
277+
message: Some(Cow::from(
278+
"sentinel_service_name must be set if the cache_type is `redissentinel`",
279+
)),
280+
params: HashMap::new(),
281+
});
282+
}
283+
}
258284
}
259285

260286
match config.queue_type {
@@ -281,6 +307,27 @@ fn validate_config_complete(config: &ConfigurationInner) -> Result<(), Validatio
281307
});
282308
}
283309
}
310+
QueueType::RedisSentinel => {
311+
if config.queue_dsn().is_none() {
312+
return Err(ValidationError {
313+
code: Cow::from("missing field"),
314+
message: Some(Cow::from(
315+
"The redis_dsn or queue_dsn field must be set if the queue_type is `redissentinel`"
316+
)),
317+
params: HashMap::new(),
318+
});
319+
}
320+
321+
if config.redis_sentinel_cfg.is_none() {
322+
return Err(ValidationError {
323+
code: Cow::from("missing field"),
324+
message: Some(Cow::from(
325+
"sentinel_service_name must be set if the queue_type is `redissentinel`",
326+
)),
327+
params: HashMap::new(),
328+
});
329+
}
330+
}
284331
}
285332

286333
Ok(())
@@ -304,6 +351,10 @@ impl ConfigurationInner {
304351
QueueType::Memory => QueueBackend::Memory,
305352
QueueType::Redis => QueueBackend::Redis(self.queue_dsn().expect(err)),
306353
QueueType::RedisCluster => QueueBackend::RedisCluster(self.queue_dsn().expect(err)),
354+
QueueType::RedisSentinel => QueueBackend::RedisSentinel(
355+
self.queue_dsn().expect(err),
356+
self.redis_sentinel_cfg.as_ref().expect(err),
357+
),
307358
QueueType::RabbitMQ => QueueBackend::RabbitMq(self.rabbit_dsn.as_ref().expect(err)),
308359
}
309360
}
@@ -318,6 +369,10 @@ impl ConfigurationInner {
318369
CacheType::Memory => CacheBackend::Memory,
319370
CacheType::Redis => CacheBackend::Redis(self.cache_dsn().expect(err)),
320371
CacheType::RedisCluster => CacheBackend::RedisCluster(self.cache_dsn().expect(err)),
372+
CacheType::RedisSentinel => CacheBackend::RedisSentinel(
373+
self.cache_dsn().expect(err),
374+
self.redis_sentinel_cfg.as_ref().expect(err),
375+
),
321376
}
322377
}
323378
}
@@ -346,6 +401,7 @@ pub enum QueueBackend<'a> {
346401
Memory,
347402
Redis(&'a str),
348403
RedisCluster(&'a str),
404+
RedisSentinel(&'a str, &'a SentinelConfig),
349405
RabbitMq(&'a str),
350406
}
351407

@@ -355,6 +411,7 @@ pub enum CacheBackend<'a> {
355411
Memory,
356412
Redis(&'a str),
357413
RedisCluster(&'a str),
414+
RedisSentinel(&'a str, &'a SentinelConfig),
358415
}
359416

360417
#[derive(Clone, Debug, Deserialize)]
@@ -378,6 +435,7 @@ pub enum QueueType {
378435
Memory,
379436
Redis,
380437
RedisCluster,
438+
RedisSentinel,
381439
RabbitMQ,
382440
}
383441

@@ -387,6 +445,7 @@ pub enum CacheType {
387445
Memory,
388446
Redis,
389447
RedisCluster,
448+
RedisSentinel,
390449
None,
391450
}
392451

@@ -430,6 +489,40 @@ impl fmt::Display for LogLevel {
430489
}
431490
}
432491

492+
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
493+
pub struct SentinelConfig {
494+
#[serde(rename = "sentinel_service_name")]
495+
pub service_name: String,
496+
#[serde(default)]
497+
pub redis_tls_mode_secure: bool,
498+
pub redis_db: Option<i64>,
499+
pub redis_username: Option<String>,
500+
pub redis_password: Option<String>,
501+
#[serde(default)]
502+
pub redis_use_resp3: bool,
503+
}
504+
505+
impl From<SentinelConfig> for omniqueue::backends::redis::SentinelConfig {
506+
fn from(val: SentinelConfig) -> Self {
507+
let SentinelConfig {
508+
service_name,
509+
redis_tls_mode_secure,
510+
redis_db,
511+
redis_username,
512+
redis_password,
513+
redis_use_resp3,
514+
} = val;
515+
omniqueue::backends::redis::SentinelConfig {
516+
service_name,
517+
redis_tls_mode_secure,
518+
redis_db,
519+
redis_username,
520+
redis_password,
521+
redis_use_resp3,
522+
}
523+
}
524+
}
525+
433526
pub fn load() -> Result<Arc<ConfigurationInner>> {
434527
if let Ok(db_url) = std::env::var("DATABASE_URL") {
435528
// If we have DATABASE_URL set, we should potentially use it.

server/svix-server/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ pub async fn run_with_prefix(
111111
let cache = match &cache_backend {
112112
CacheBackend::None => cache::none::new(),
113113
CacheBackend::Memory => cache::memory::new(),
114-
CacheBackend::Redis(_) | CacheBackend::RedisCluster(_) => {
114+
CacheBackend::Redis(_)
115+
| CacheBackend::RedisCluster(_)
116+
| CacheBackend::RedisSentinel(_, _) => {
115117
let mgr = RedisManager::from_cache_backend(&cache_backend).await;
116118
cache::redis::new(mgr)
117119
}

server/svix-server/src/queue/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ pub async fn new_pair(
3333
prefix: Option<&str>,
3434
) -> (TaskQueueProducer, TaskQueueConsumer) {
3535
match cfg.queue_backend() {
36-
QueueBackend::Redis(_) | QueueBackend::RedisCluster(_) => {
37-
redis::new_pair(cfg, prefix).await
38-
}
36+
QueueBackend::Redis(_)
37+
| QueueBackend::RedisCluster(_)
38+
| QueueBackend::RedisSentinel(_, _) => redis::new_pair(cfg, prefix).await,
3939
QueueBackend::Memory => {
4040
let (producer, consumer) = InMemoryBackend::builder()
4141
.build_pair()

server/svix-server/src/queue/redis.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ async fn new_pair_inner(
206206
consumer_name: WORKER_CONSUMER.to_owned(),
207207
payload_key: QUEUE_KV_KEY.to_owned(),
208208
ack_deadline_ms: pending_duration,
209+
dlq_config: None,
210+
sentinel_config: cfg.redis_sentinel_cfg.clone().map(|c| c.into()),
209211
};
210212

211213
match &cfg.queue_type {
@@ -219,7 +221,17 @@ async fn new_pair_inner(
219221
let consumer = TaskQueueConsumer::new(consumer);
220222
(producer, consumer)
221223
}
222-
_ => {
224+
QueueType::RedisSentinel => {
225+
let (producer, consumer) = RedisBackend::sentinel_builder(config)
226+
.build_pair()
227+
.await
228+
.expect("Error initializing redis-cluster queue");
229+
230+
let producer = TaskQueueProducer::new(producer);
231+
let consumer = TaskQueueConsumer::new(consumer);
232+
(producer, consumer)
233+
}
234+
QueueType::Redis => {
223235
let (producer, consumer) = RedisBackend::builder(config)
224236
.build_pair()
225237
.await
@@ -229,6 +241,7 @@ async fn new_pair_inner(
229241
let consumer = TaskQueueConsumer::new(consumer);
230242
(producer, consumer)
231243
}
244+
_ => panic!("Unsupported backend!"),
232245
}
233246
}
234247

0 commit comments

Comments
 (0)