Skip to content

Redis refactor #1471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions server/svix-server/src/queue/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{QueueTask, TaskQueueConsumer, TaskQueueProducer};
use crate::{
cfg::{Configuration, QueueType},
error::Result,
redis::{PooledConnection, RedisManager},
redis::{RedisConnection, RedisManager},
};

/// This is the key of the main queue. As a KV store, redis places the entire stream under this key.
Expand Down Expand Up @@ -240,15 +240,15 @@ fn task_from_redis_key(key: &str) -> serde_json::Result<Arc<QueueTask>> {
serde_json::from_str(&key[pos + 1..])
}

async fn migrate_v2_to_v3_queues(conn: &mut PooledConnection<'_>) -> Result<()> {
async fn migrate_v2_to_v3_queues(conn: &mut RedisConnection<'_>) -> Result<()> {
migrate_list_to_stream(conn, LEGACY_V2_MAIN, MAIN).await?;
migrate_list_to_stream(conn, LEGACY_V2_PROCESSING, MAIN).await?;

Ok(())
}

async fn migrate_list_to_stream(
conn: &mut PooledConnection<'_>,
conn: &mut RedisConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn migrate_list_to_stream(
}
}

async fn migrate_v1_to_v2_queues(conn: &mut PooledConnection<'_>) -> Result<()> {
async fn migrate_v1_to_v2_queues(conn: &mut RedisConnection<'_>) -> Result<()> {
migrate_list(conn, LEGACY_V1_MAIN, LEGACY_V2_MAIN).await?;
migrate_list(conn, LEGACY_V1_PROCESSING, LEGACY_V2_PROCESSING).await?;
migrate_sset(conn, LEGACY_V1_DELAYED, DELAYED).await?;
Expand All @@ -295,7 +295,7 @@ async fn migrate_v1_to_v2_queues(conn: &mut PooledConnection<'_>) -> Result<()>
}

async fn migrate_list(
conn: &mut PooledConnection<'_>,
conn: &mut RedisConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
Expand All @@ -318,7 +318,7 @@ async fn migrate_list(
}

async fn migrate_sset(
conn: &mut PooledConnection<'_>,
conn: &mut RedisConnection<'_>,
legacy_queue: &str,
queue: &str,
) -> Result<()> {
Expand Down
166 changes: 83 additions & 83 deletions server/svix-server/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl RedisManager {
}
}

pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
match self {
Self::Clustered(pool) => pool.get().await,
Self::NonClustered(pool) => pool.get().await,
Expand All @@ -103,11 +103,11 @@ pub struct ClusteredRedisPool {
}

impl ClusteredRedisPool {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
let con = ClusteredPooledConnection {
con: self.pool.get().await?,
};
Ok(PooledConnection::Clustered(con))
Ok(RedisConnection::Clustered(con))
}
}

Expand All @@ -117,8 +117,8 @@ pub struct ClusteredRedisUnpooled {
}

impl ClusteredRedisUnpooled {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
Ok(PooledConnection::ClusteredUnpooled(
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
Ok(RedisConnection::ClusteredUnpooled(
ClusteredUnpooledConnection {
con: self.con.clone(),
},
Expand All @@ -138,8 +138,8 @@ pub struct NonClusteredRedisUnpooled {
}

impl NonClusteredRedisUnpooled {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
Ok(PooledConnection::NonClusteredUnpooled(
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
Ok(RedisConnection::NonClusteredUnpooled(
NonClusteredUnpooledConnection {
con: self.con.clone(),
},
Expand All @@ -159,21 +159,21 @@ pub struct NonClusteredRedisPool {
}

impl NonClusteredRedisPool {
pub async fn get(&self) -> Result<PooledConnection<'_>, RunError<RedisError>> {
pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
let con = self.pool.get().await?;
let con = NonClusteredPooledConnection { con };
Ok(PooledConnection::NonClustered(con))
Ok(RedisConnection::NonClustered(con))
}
}

pub enum PooledConnection<'a> {
pub enum RedisConnection<'a> {
Clustered(ClusteredPooledConnection<'a>),
ClusteredUnpooled(ClusteredUnpooledConnection),
NonClustered(NonClusteredPooledConnection<'a>),
NonClusteredUnpooled(NonClusteredUnpooledConnection),
}

impl PooledConnection<'_> {
impl RedisConnection<'_> {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(self).await
}
Expand All @@ -186,16 +186,16 @@ impl PooledConnection<'_> {
}
}

impl redis::aio::ConnectionLike for PooledConnection<'_> {
impl redis::aio::ConnectionLike for RedisConnection<'_> {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
PooledConnection::Clustered(conn) => conn.con.req_packed_command(cmd),
PooledConnection::NonClustered(conn) => conn.con.req_packed_command(cmd),
PooledConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
PooledConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
RedisConnection::Clustered(conn) => conn.con.req_packed_command(cmd),
RedisConnection::NonClustered(conn) => conn.con.req_packed_command(cmd),
RedisConnection::ClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
RedisConnection::NonClusteredUnpooled(conn) => conn.con.req_packed_command(cmd),
}
}

Expand All @@ -206,96 +206,96 @@ impl redis::aio::ConnectionLike for PooledConnection<'_> {
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
PooledConnection::Clustered(conn) => conn.con.req_packed_commands(cmd, offset, count),
PooledConnection::NonClustered(conn) => {
RedisConnection::Clustered(conn) => conn.con.req_packed_commands(cmd, offset, count),
RedisConnection::NonClustered(conn) => conn.con.req_packed_commands(cmd, offset, count),
RedisConnection::ClusteredUnpooled(conn) => {
conn.con.req_packed_commands(cmd, offset, count)
}
PooledConnection::ClusteredUnpooled(conn) => {
conn.con.req_packed_commands(cmd, offset, count)
}
PooledConnection::NonClusteredUnpooled(conn) => {
RedisConnection::NonClusteredUnpooled(conn) => {
conn.con.req_packed_commands(cmd, offset, count)
}
}
}

fn get_db(&self) -> i64 {
match self {
PooledConnection::Clustered(conn) => conn.con.get_db(),
PooledConnection::NonClustered(conn) => conn.con.get_db(),
PooledConnection::ClusteredUnpooled(conn) => conn.con.get_db(),
PooledConnection::NonClusteredUnpooled(conn) => conn.con.get_db(),
RedisConnection::Clustered(conn) => conn.con.get_db(),
RedisConnection::NonClustered(conn) => conn.con.get_db(),
RedisConnection::ClusteredUnpooled(conn) => conn.con.get_db(),
RedisConnection::NonClusteredUnpooled(conn) => conn.con.get_db(),
}
}
}

pub struct NonClusteredPooledConnection<'a> {
con: bb8::PooledConnection<'a, RedisConnectionManager>,
}

impl<'a> NonClusteredPooledConnection<'a> {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(&mut *self.con).await
}

pub async fn query_async_pipeline<T: FromRedisValue>(
&mut self,
pipe: redis::Pipeline,
) -> RedisResult<T> {
pipe.query_async(&mut *self.con).await
}
}
macro_rules! pooled_connection {
(
$(
$struct_name:ident,
$con_type:ty
),*
) => {
$(
pub struct $struct_name<'a> {
con: $con_type,
}

pub struct NonClusteredUnpooledConnection {
con: redis::aio::ConnectionManager,
}
impl<'a> $struct_name<'a> {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(&mut *self.con).await
}

impl NonClusteredUnpooledConnection {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(&mut self.con).await
}

pub async fn query_async_pipeline<T: FromRedisValue>(
&mut self,
pipe: redis::Pipeline,
) -> RedisResult<T> {
pipe.query_async(&mut self.con).await
pub async fn query_async_pipeline<T: FromRedisValue>(&mut self, pipe: redis::Pipeline) -> RedisResult<T> {
pipe.query_async(&mut *self.con).await
}
}
)*
}
}

pub struct ClusteredPooledConnection<'a> {
con: bb8::PooledConnection<'a, RedisClusterConnectionManager>,
}
macro_rules! connection {
(
$(
$struct_name:ident,
$con_type:ty
),*
) => {
$(
pub struct $struct_name {
con: $con_type,
}

impl<'a> ClusteredPooledConnection<'a> {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(&mut *self.con).await
}
impl $struct_name {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(&mut self.con).await
}

pub async fn query_async_pipeline<T: FromRedisValue>(
&mut self,
pipe: redis::Pipeline,
) -> RedisResult<T> {
pipe.query_async(&mut *self.con).await
pub async fn query_async_pipeline<T: FromRedisValue>(&mut self, pipe: redis::Pipeline) -> RedisResult<T> {
pipe.query_async(&mut self.con).await
}
}
)*
}
}

pub struct ClusteredUnpooledConnection {
con: redis::cluster_async::ClusterConnection,
}

impl ClusteredUnpooledConnection {
pub async fn query_async<T: FromRedisValue>(&mut self, cmd: redis::Cmd) -> RedisResult<T> {
cmd.query_async(&mut self.con).await
}

pub async fn query_async_pipeline<T: FromRedisValue>(
&mut self,
pipe: redis::Pipeline,
) -> RedisResult<T> {
pipe.query_async(&mut self.con).await
}
}
pooled_connection!(
NonClusteredPooledConnection,
bb8::PooledConnection<'a, RedisConnectionManager>
);

pooled_connection!(
ClusteredPooledConnection,
bb8::PooledConnection<'a, RedisClusterConnectionManager>
);

connection!(
NonClusteredUnpooledConnection,
redis::aio::ConnectionManager
);

connection!(
ClusteredUnpooledConnection,
redis::cluster_async::ClusterConnection
);

#[cfg(test)]
mod tests {
Expand Down