diff --git a/server/svix-server/src/queue/redis.rs b/server/svix-server/src/queue/redis.rs index 90658b0e7..d653a6b39 100644 --- a/server/svix-server/src/queue/redis.rs +++ b/server/svix-server/src/queue/redis.rs @@ -36,7 +36,7 @@ use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer}; use crate::{ cfg::{Configuration, QueueType}, error::Result, - redis::{PooledConnection, RedisManager}, + redis::{RedisConnection, RedisManager}, }; /// This is the key of the main queue. As a KV store, redis places the entire stream under this key. @@ -240,7 +240,7 @@ fn task_from_redis_key(key: &str) -> serde_json::Result> { serde_json::from_str(&key[pos + 1..]) } -async fn migrate_v2_to_v3_queues(conn: &mut PooledConnection<'_>) -> Result<()> { +async fn migrate_v2_to_v3_queues(conn: &mut RedisConnection<'_>) -> Result<()> { migrate_list_to_stream(conn, LEGACY_V2_MAIN, MAIN).await?; migrate_list_to_stream(conn, LEGACY_V2_PROCESSING, MAIN).await?; @@ -248,7 +248,7 @@ async fn migrate_v2_to_v3_queues(conn: &mut PooledConnection<'_>) -> Result<()> } async fn migrate_list_to_stream( - conn: &mut PooledConnection<'_>, + conn: &mut RedisConnection<'_>, legacy_queue: &str, queue: &str, ) -> Result<()> { @@ -286,7 +286,7 @@ async fn migrate_list_to_stream( } } -async fn migrate_v1_to_v2_queues(conn: &mut PooledConnection<'_>) -> Result<()> { +async fn migrate_v1_to_v2_queues(conn: &mut RedisConnection<'_>) -> Result<()> { migrate_list(conn, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await?; migrate_list(conn, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await?; migrate_sset(conn, LEGACY_V1_DELAYED, DELAYED).await?; @@ -295,7 +295,7 @@ async fn migrate_v1_to_v2_queues(conn: &mut PooledConnection<'_>) -> Result<()> } async fn migrate_list( - conn: &mut PooledConnection<'_>, + conn: &mut RedisConnection<'_>, legacy_queue: &str, queue: &str, ) -> Result<()> { @@ -318,7 +318,7 @@ async fn migrate_list( } async fn migrate_sset( - conn: &mut PooledConnection<'_>, + conn: &mut RedisConnection<'_>, legacy_queue: &str, queue: &str, ) -> Result<()> { diff --git a/server/svix-server/src/redis/mod.rs b/server/svix-server/src/redis/mod.rs index 87581aa4c..dd0f152c9 100644 --- a/server/svix-server/src/redis/mod.rs +++ b/server/svix-server/src/redis/mod.rs @@ -87,7 +87,7 @@ impl RedisManager { } } - pub async fn get(&self) -> Result, RunError> { + pub async fn get(&self) -> Result, RunError> { match self { Self::Clustered(pool) => pool.get().await, Self::NonClustered(pool) => pool.get().await, @@ -103,11 +103,11 @@ pub struct ClusteredRedisPool { } impl ClusteredRedisPool { - pub async fn get(&self) -> Result, RunError> { + pub async fn get(&self) -> Result, RunError> { let con = ClusteredPooledConnection { con: self.pool.get().await?, }; - Ok(PooledConnection::Clustered(con)) + Ok(RedisConnection::Clustered(con)) } } @@ -117,8 +117,8 @@ pub struct ClusteredRedisUnpooled { } impl ClusteredRedisUnpooled { - pub async fn get(&self) -> Result, RunError> { - Ok(PooledConnection::ClusteredUnpooled( + pub async fn get(&self) -> Result, RunError> { + Ok(RedisConnection::ClusteredUnpooled( ClusteredUnpooledConnection { con: self.con.clone(), }, @@ -138,8 +138,8 @@ pub struct NonClusteredRedisUnpooled { } impl NonClusteredRedisUnpooled { - pub async fn get(&self) -> Result, RunError> { - Ok(PooledConnection::NonClusteredUnpooled( + pub async fn get(&self) -> Result, RunError> { + Ok(RedisConnection::NonClusteredUnpooled( NonClusteredUnpooledConnection { con: self.con.clone(), }, @@ -159,21 +159,21 @@ pub struct NonClusteredRedisPool { } impl NonClusteredRedisPool { - pub async fn get(&self) -> Result, RunError> { + pub async fn get(&self) -> Result, RunError> { let con = self.pool.get().await?; let con = NonClusteredPooledConnection { con }; - Ok(PooledConnection::NonClustered(con)) + Ok(RedisConnection::NonClustered(con)) } } -pub enum PooledConnection<'a> { +pub enum RedisConnection<'a> { Clustered(ClusteredPooledConnection<'a>), ClusteredUnpooled(ClusteredUnpooledConnection), NonClustered(NonClusteredPooledConnection<'a>), NonClusteredUnpooled(NonClusteredUnpooledConnection), } -impl PooledConnection<'_> { +impl RedisConnection<'_> { pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { cmd.query_async(self).await } @@ -186,16 +186,16 @@ impl PooledConnection<'_> { } } -impl redis::aio::ConnectionLike for PooledConnection<'_> { +impl redis::aio::ConnectionLike for RedisConnection<'_> { fn req_packed_command<'a>( &'a mut self, cmd: &'a redis::Cmd, ) -> redis::RedisFuture<'a, redis::Value> { match self { - PooledConnection::Clustered(conn) => conn.con.req_packed_command(cmd), - PooledConnection::NonClustered(conn) => conn.con.req_packed_command(cmd), - PooledConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd), - PooledConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd), + RedisConnection::Clustered(conn) => conn.con.req_packed_command(cmd), + RedisConnection::NonClustered(conn) => conn.con.req_packed_command(cmd), + RedisConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd), + RedisConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd), } } @@ -206,14 +206,12 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> { count: usize, ) -> redis::RedisFuture<'a, Vec> { match self { - PooledConnection::Clustered(conn) => conn.con.req_packed_commands(cmd, offset, count), - PooledConnection::NonClustered(conn) => { + RedisConnection::Clustered(conn) => conn.con.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustered(conn) => conn.con.req_packed_commands(cmd, offset, count), + RedisConnection::ClusteredUnpooled(conn) => { conn.con.req_packed_commands(cmd, offset, count) } - PooledConnection::ClusteredUnpooled(conn) => { - conn.con.req_packed_commands(cmd, offset, count) - } - PooledConnection::NonClusteredUnpooled(conn) => { + RedisConnection::NonClusteredUnpooled(conn) => { conn.con.req_packed_commands(cmd, offset, count) } } @@ -221,81 +219,83 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> { fn get_db(&self) -> i64 { match self { - PooledConnection::Clustered(conn) => conn.con.get_db(), - PooledConnection::NonClustered(conn) => conn.con.get_db(), - PooledConnection::ClusteredUnpooled(conn) => conn.con.get_db(), - PooledConnection::NonClusteredUnpooled(conn) => conn.con.get_db(), + RedisConnection::Clustered(conn) => conn.con.get_db(), + RedisConnection::NonClustered(conn) => conn.con.get_db(), + RedisConnection::ClusteredUnpooled(conn) => conn.con.get_db(), + RedisConnection::NonClusteredUnpooled(conn) => conn.con.get_db(), } } } -pub struct NonClusteredPooledConnection<'a> { - con: bb8::PooledConnection<'a, RedisConnectionManager>, -} - -impl<'a> NonClusteredPooledConnection<'a> { - pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { - cmd.query_async(&mut *self.con).await - } - - pub async fn query_async_pipeline( - &mut self, - pipe: redis::Pipeline, - ) -> RedisResult { - pipe.query_async(&mut *self.con).await - } -} +macro_rules! pooled_connection { + ( + $( + $struct_name:ident, + $con_type:ty + ),* + ) => { + $( + pub struct $struct_name<'a> { + con: $con_type, + } -pub struct NonClusteredUnpooledConnection { - con: redis::aio::ConnectionManager, -} + impl<'a> $struct_name<'a> { + pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { + cmd.query_async(&mut *self.con).await + } -impl NonClusteredUnpooledConnection { - pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { - cmd.query_async(&mut self.con).await - } - - pub async fn query_async_pipeline( - &mut self, - pipe: redis::Pipeline, - ) -> RedisResult { - pipe.query_async(&mut self.con).await + pub async fn query_async_pipeline(&mut self, pipe: redis::Pipeline) -> RedisResult { + pipe.query_async(&mut *self.con).await + } + } + )* } } -pub struct ClusteredPooledConnection<'a> { - con: bb8::PooledConnection<'a, RedisClusterConnectionManager>, -} +macro_rules! connection { + ( + $( + $struct_name:ident, + $con_type:ty + ),* + ) => { + $( + pub struct $struct_name { + con: $con_type, + } -impl<'a> ClusteredPooledConnection<'a> { - pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { - cmd.query_async(&mut *self.con).await - } + impl $struct_name { + pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { + cmd.query_async(&mut self.con).await + } - pub async fn query_async_pipeline( - &mut self, - pipe: redis::Pipeline, - ) -> RedisResult { - pipe.query_async(&mut *self.con).await + pub async fn query_async_pipeline(&mut self, pipe: redis::Pipeline) -> RedisResult { + pipe.query_async(&mut self.con).await + } + } + )* } } -pub struct ClusteredUnpooledConnection { - con: redis::cluster_async::ClusterConnection, -} - -impl ClusteredUnpooledConnection { - pub async fn query_async(&mut self, cmd: redis::Cmd) -> RedisResult { - cmd.query_async(&mut self.con).await - } - - pub async fn query_async_pipeline( - &mut self, - pipe: redis::Pipeline, - ) -> RedisResult { - pipe.query_async(&mut self.con).await - } -} +pooled_connection!( + NonClusteredPooledConnection, + bb8::PooledConnection<'a, RedisConnectionManager> +); + +pooled_connection!( + ClusteredPooledConnection, + bb8::PooledConnection<'a, RedisClusterConnectionManager> +); + +connection!( + NonClusteredUnpooledConnection, + redis::aio::ConnectionManager +); + +connection!( + ClusteredUnpooledConnection, + redis::cluster_async::ClusterConnection +); #[cfg(test)] mod tests {