Skip to content

Commit 3ab17e7

Browse files
authored
refactor(services/redis): Implement ConnectionLike for RedisConnection (#5748)
* refactor(services/redis): Implement ConnectionLike for RedisConnection Signed-off-by: Xuanwo <[email protected]> * Cleanup Signed-off-by: Xuanwo <[email protected]> * Format cargo Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent ec0dcd2 commit 3ab17e7

File tree

4 files changed

+154
-104
lines changed

4 files changed

+154
-104
lines changed

core/Cargo.lock

Lines changed: 120 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ persy = { version = "1.4.6", optional = true }
317317
# for services-redb
318318
redb = { version = "2", optional = true }
319319
# for services-redis
320-
redis = { version = "0.27", features = [
320+
redis = { version = "0.29", features = [
321321
"cluster-async",
322322
"tokio-comp",
323323
"connection-manager",

core/src/services/redis/backend.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,27 @@
1616
// under the License.
1717

1818
use bb8::RunError;
19+
use bytes::Bytes;
1920
use http::Uri;
2021
use redis::cluster::ClusterClient;
2122
use redis::cluster::ClusterClientBuilder;
22-
use redis::Client;
2323
use redis::ConnectionAddr;
2424
use redis::ConnectionInfo;
2525
use redis::ProtocolVersion;
2626
use redis::RedisConnectionInfo;
27+
use redis::{AsyncCommands, Client};
2728
use std::fmt::Debug;
2829
use std::fmt::Formatter;
2930
use std::path::PathBuf;
3031
use std::time::Duration;
3132
use tokio::sync::OnceCell;
3233

34+
use super::core::*;
3335
use crate::raw::adapters::kv;
3436
use crate::raw::*;
3537
use crate::services::RedisConfig;
3638
use crate::*;
3739

38-
use super::core::*;
3940
const DEFAULT_REDIS_ENDPOINT: &str = "tcp://127.0.0.1:6379";
4041
const DEFAULT_REDIS_PORT: u16 = 6379;
4142

@@ -345,26 +346,33 @@ impl kv::Adapter for Adapter {
345346

346347
async fn get(&self, key: &str) -> Result<Option<Buffer>> {
347348
let mut conn = self.conn().await?;
348-
let result = conn.get(key).await?;
349-
Ok(result)
349+
let result: Option<Bytes> = conn.get(key).await.map_err(format_redis_error)?;
350+
Ok(result.map(Buffer::from))
350351
}
351352

352353
async fn set(&self, key: &str, value: Buffer) -> Result<()> {
353354
let mut conn = self.conn().await?;
354355
let value = value.to_vec();
355-
conn.set(key, value, self.default_ttl).await?;
356+
if let Some(dur) = self.default_ttl {
357+
let _: () = conn
358+
.set_ex(key, value, dur.as_secs())
359+
.await
360+
.map_err(format_redis_error)?;
361+
} else {
362+
let _: () = conn.set(key, value).await.map_err(format_redis_error)?;
363+
}
356364
Ok(())
357365
}
358366

359367
async fn delete(&self, key: &str) -> Result<()> {
360368
let mut conn = self.conn().await?;
361-
conn.delete(key).await?;
369+
let _: () = conn.del(key).await.map_err(format_redis_error)?;
362370
Ok(())
363371
}
364372

365373
async fn append(&self, key: &str, value: &[u8]) -> Result<()> {
366374
let mut conn = self.conn().await?;
367-
conn.append(key, value).await?;
375+
let _: () = conn.append(key, value).await.map_err(format_redis_error)?;
368376
Ok(())
369377
}
370378
}

0 commit comments

Comments
 (0)