Skip to content

Commit 495afdd

Browse files
authored
Merge pull request #12 from fjall-rs/generic-blob-cache
Generic blob cache
2 parents ba44eab + a4a8f9b commit 495afdd

23 files changed

+222
-436
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ byteview = { version = "0.6.1" }
2828
interval-heap = "0.0.5"
2929
log = "0.4.22"
3030
path-absolutize = "3.1.1"
31-
quick_cache = { version = "0.6.5", default-features = false }
3231
rustc-hash = "2.0.0"
3332
serde = { version = "1.0.215", optional = true, features = ["derive"] }
3433
tempfile = "3.12.0"

benches/value_log.rs

Lines changed: 81 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,70 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
22
use rand::{Rng, RngCore};
3-
use std::sync::Arc;
3+
use std::{
4+
collections::BTreeMap,
5+
sync::{Arc, RwLock},
6+
};
47
use value_log::{
5-
BlobCache, Compressor, Config, IndexReader, IndexWriter, MockIndex, MockIndexWriter, ValueLog,
8+
BlobCache, Compressor, Config, IndexReader, IndexWriter, UserKey, UserValue, ValueHandle,
9+
ValueLog, ValueLogId,
610
};
711

12+
type MockIndexInner = RwLock<BTreeMap<UserKey, (ValueHandle, u32)>>;
13+
14+
/// Mock in-memory index
15+
#[allow(clippy::module_name_repetitions)]
16+
#[derive(Clone, Default)]
17+
pub struct MockIndex(Arc<MockIndexInner>);
18+
19+
impl std::ops::Deref for MockIndex {
20+
type Target = MockIndexInner;
21+
22+
fn deref(&self) -> &Self::Target {
23+
&self.0
24+
}
25+
}
26+
27+
impl MockIndex {
28+
/// Remove item
29+
pub fn remove(&self, key: &[u8]) {
30+
self.0.write().expect("lock is poisoned").remove(key);
31+
}
32+
}
33+
34+
impl IndexReader for MockIndex {
35+
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>> {
36+
Ok(self
37+
.read()
38+
.expect("lock is poisoned")
39+
.get(key)
40+
.map(|(vhandle, _)| vhandle)
41+
.cloned())
42+
}
43+
}
44+
45+
/// Used for tests only
46+
#[allow(clippy::module_name_repetitions)]
47+
pub struct MockIndexWriter(pub MockIndex);
48+
49+
impl IndexWriter for MockIndexWriter {
50+
fn insert_indirect(
51+
&mut self,
52+
key: &[u8],
53+
value: ValueHandle,
54+
size: u32,
55+
) -> std::io::Result<()> {
56+
self.0
57+
.write()
58+
.expect("lock is poisoned")
59+
.insert(key.into(), (value, size));
60+
Ok(())
61+
}
62+
63+
fn finish(&mut self) -> std::io::Result<()> {
64+
Ok(())
65+
}
66+
}
67+
868
#[derive(Clone, Default)]
969
struct NoCompressor;
1070

@@ -18,6 +78,17 @@ impl Compressor for NoCompressor {
1878
}
1979
}
2080

81+
#[derive(Clone)]
82+
struct NoCacher;
83+
84+
impl BlobCache for NoCacher {
85+
fn get(&self, _: ValueLogId, _: &ValueHandle) -> Option<UserValue> {
86+
None
87+
}
88+
89+
fn insert(&self, _: ValueLogId, _: &ValueHandle, _: UserValue) {}
90+
}
91+
2192
fn prefetch(c: &mut Criterion) {
2293
let mut group = c.benchmark_group("prefetch range");
2394

@@ -30,11 +101,11 @@ fn prefetch(c: &mut Criterion) {
30101
let folder = tempfile::tempdir().unwrap();
31102
let vl_path = folder.path();
32103

33-
let value_log = ValueLog::open(vl_path, Config::<NoCompressor>::default()).unwrap();
104+
let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap();
34105

35106
let mut writer = value_log.get_writer().unwrap();
36107

37-
let mut rng = rand::thread_rng();
108+
let mut rng: rand::prelude::ThreadRng = rand::rng();
38109

39110
for key in (0u64..2_000_000).map(u64::to_be_bytes) {
40111
let mut data = vec![0u8; item_size];
@@ -51,11 +122,11 @@ fn prefetch(c: &mut Criterion) {
51122

52123
value_log.register_writer(writer).unwrap();
53124

54-
let mut rng = rand::thread_rng();
125+
let mut rng = rand::rng();
55126

56127
group.bench_function(format!("{range_size}x{item_size}B - no prefetch"), |b| {
57128
b.iter(|| {
58-
let start = rng.gen_range(0u64..1_999_000);
129+
let start = rng.random_range(0u64..1_999_000);
59130

60131
for x in start..(start + range_size) {
61132
let vhandle = index.get(&x.to_be_bytes()).unwrap().unwrap();
@@ -67,7 +138,7 @@ fn prefetch(c: &mut Criterion) {
67138

68139
group.bench_function(format!("{range_size}x{item_size}B - with prefetch"), |b| {
69140
b.iter(|| {
70-
let start = rng.gen_range(0u64..1_999_000);
141+
let start = rng.random_range(0u64..1_999_000);
71142

72143
{
73144
let vhandle = index.get(&start.to_be_bytes()).unwrap().unwrap();
@@ -113,65 +184,11 @@ fn load_value(c: &mut Criterion) {
113184
let folder = tempfile::tempdir().unwrap();
114185
let vl_path = folder.path();
115186

116-
let value_log = ValueLog::open(
117-
vl_path,
118-
Config::<NoCompressor>::default()
119-
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
120-
)
121-
.unwrap();
122-
123-
let mut writer = value_log.get_writer().unwrap();
124-
125-
let mut rng = rand::thread_rng();
126-
127-
for size in sizes {
128-
let key = size.to_string();
129-
130-
let mut data = vec![0u8; size];
131-
rng.fill_bytes(&mut data);
132-
133-
index_writer
134-
.insert_indirect(
135-
key.as_bytes(),
136-
writer.get_next_value_handle(),
137-
data.len() as u32,
138-
)
139-
.unwrap();
140-
141-
writer.write(key.as_bytes(), &data).unwrap();
142-
}
143-
144-
value_log.register_writer(writer).unwrap();
145-
146-
for size in sizes {
147-
let key = size.to_string();
148-
let vhandle = index.get(key.as_bytes()).unwrap().unwrap();
149-
150-
group.bench_function(format!("{size} bytes (uncached)"), |b| {
151-
b.iter(|| {
152-
value_log.get(&vhandle).unwrap().unwrap();
153-
})
154-
});
155-
}
156-
}
157-
158-
{
159-
let index = MockIndex::default();
160-
let mut index_writer = MockIndexWriter(index.clone());
161-
162-
let folder = tempfile::tempdir().unwrap();
163-
let vl_path = folder.path();
164-
165-
let value_log = ValueLog::open(
166-
vl_path,
167-
Config::<NoCompressor>::default()
168-
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))),
169-
)
170-
.unwrap();
187+
let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap();
171188

172189
let mut writer = value_log.get_writer().unwrap();
173190

174-
let mut rng = rand::thread_rng();
191+
let mut rng = rand::rng();
175192

176193
for size in sizes {
177194
let key = size.to_string();
@@ -196,10 +213,7 @@ fn load_value(c: &mut Criterion) {
196213
let key = size.to_string();
197214
let vhandle = index.get(key.as_bytes()).unwrap().unwrap();
198215

199-
// NOTE: Warm up cache
200-
value_log.get(&vhandle).unwrap().unwrap();
201-
202-
group.bench_function(format!("{size} bytes (cached)"), |b| {
216+
group.bench_function(format!("{size} bytes"), |b| {
203217
b.iter(|| {
204218
value_log.get(&vhandle).unwrap().unwrap();
205219
})

src/blob_cache.rs

Lines changed: 6 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -2,106 +2,17 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use crate::{value::UserValue, value_log::ValueLogId, ValueHandle};
6-
use quick_cache::{sync::Cache, Equivalent, Weighter};
7-
8-
type Item = UserValue;
9-
10-
#[derive(Eq, std::hash::Hash, PartialEq)]
11-
pub struct CacheKey(ValueLogId, ValueHandle);
12-
13-
impl Equivalent<CacheKey> for (ValueLogId, &ValueHandle) {
14-
fn equivalent(&self, key: &CacheKey) -> bool {
15-
self.0 == key.0 && self.1 == &key.1
16-
}
17-
}
18-
19-
impl From<(ValueLogId, ValueHandle)> for CacheKey {
20-
fn from((vid, vhandle): (ValueLogId, ValueHandle)) -> Self {
21-
Self(vid, vhandle)
22-
}
23-
}
24-
25-
#[derive(Clone)]
26-
struct BlobWeighter;
27-
28-
impl Weighter<CacheKey, Item> for BlobWeighter {
29-
#[allow(clippy::cast_possible_truncation)]
30-
fn weight(&self, _: &CacheKey, blob: &Item) -> u64 {
31-
blob.len() as u64
32-
}
33-
}
5+
use crate::{value_log::ValueLogId, UserValue, ValueHandle};
346

357
/// Blob cache, in which blobs are cached in-memory
368
/// after being retrieved from disk
379
///
3810
/// This speeds up consecutive accesses to the same blobs, improving
3911
/// read performance for hot data.
40-
pub struct BlobCache {
41-
// NOTE: rustc_hash performed best: https://fjall-rs.github.io/post/fjall-2-1
42-
/// Concurrent cache implementation
43-
data: Cache<CacheKey, Item, BlobWeighter, rustc_hash::FxBuildHasher>,
44-
45-
/// Capacity in bytes
46-
capacity: u64,
47-
}
48-
49-
impl std::fmt::Debug for BlobCache {
50-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51-
write!(f, "BlobCache<cap: {} bytes>", self.capacity)
52-
}
53-
}
54-
55-
impl BlobCache {
56-
/// Creates a new block cache with roughly `n` bytes of capacity.
57-
#[must_use]
58-
pub fn with_capacity_bytes(bytes: u64) -> Self {
59-
use quick_cache::sync::DefaultLifecycle;
60-
61-
#[allow(clippy::default_trait_access)]
62-
let quick_cache = Cache::with(
63-
10_000,
64-
bytes,
65-
BlobWeighter,
66-
Default::default(),
67-
DefaultLifecycle::default(),
68-
);
69-
70-
Self {
71-
data: quick_cache,
72-
capacity: bytes,
73-
}
74-
}
75-
76-
pub(crate) fn insert(&self, key: CacheKey, value: UserValue) {
77-
self.data.insert(key, value);
78-
}
79-
80-
pub(crate) fn get(&self, vlog_id: ValueLogId, vhandle: &ValueHandle) -> Option<Item> {
81-
self.data.get(&(vlog_id, vhandle))
82-
}
83-
84-
/// Returns the cache capacity in bytes.
85-
#[must_use]
86-
pub fn capacity(&self) -> u64 {
87-
self.capacity
88-
}
89-
90-
/// Returns the size in bytes.
91-
#[must_use]
92-
pub fn size(&self) -> u64 {
93-
self.data.weight()
94-
}
95-
96-
/// Returns the number of cached blocks.
97-
#[must_use]
98-
pub fn len(&self) -> usize {
99-
self.data.len()
100-
}
12+
pub trait BlobCache: Clone {
13+
/// Caches a blob.
14+
fn insert(&self, vlog_id: ValueLogId, vhandle: &ValueHandle, value: UserValue);
10115

102-
/// Returns `true` if there are no cached blocks.
103-
#[must_use]
104-
pub fn is_empty(&self) -> bool {
105-
self.len() == 0
106-
}
16+
/// Retrieves a blob from the cache, or `None` if it could not be found.
17+
fn get(&self, vlog_id: ValueLogId, vhandle: &ValueHandle) -> Option<UserValue>;
10718
}

src/config.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,29 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{blob_cache::BlobCache, compression::Compressor};
6-
use std::sync::Arc;
76

87
/// Value log configuration
9-
pub struct Config<C: Compressor + Clone> {
8+
pub struct Config<BC: BlobCache, C: Compressor + Clone> {
109
/// Target size of vLog segments
1110
pub(crate) segment_size_bytes: u64,
1211

1312
/// Blob cache to use
14-
pub(crate) blob_cache: Arc<BlobCache>,
13+
pub(crate) blob_cache: BC,
1514

1615
/// Compression to use
1716
pub(crate) compression: C,
1817
}
1918

20-
impl<C: Compressor + Clone + Default> Default for Config<C> {
21-
fn default() -> Self {
19+
impl<BC: BlobCache, C: Compressor + Clone + Default> Config<BC, C> {
20+
/// Creates a new configuration builder.
21+
pub fn new(blob_cache: BC) -> Self {
2222
Self {
23-
segment_size_bytes: /* 256 MiB */ 256 * 1_024 * 1_024,
24-
blob_cache: Arc::new(BlobCache::with_capacity_bytes(
25-
/* 16 MiB */ 16 * 1_024 * 1_024,
26-
)),
27-
compression: C::default(),
23+
blob_cache,
24+
compression: Default::default(),
25+
segment_size_bytes: 128 * 1_024 * 1_024,
2826
}
2927
}
30-
}
3128

32-
impl<C: Compressor + Clone> Config<C> {
3329
/// Sets the compression & decompression scheme.
3430
#[must_use]
3531
pub fn compression(mut self, compressor: C) -> Self {
@@ -41,10 +37,8 @@ impl<C: Compressor + Clone> Config<C> {
4137
///
4238
/// You can create a global [`BlobCache`] and share it between multiple
4339
/// value logs to cap global cache memory usage.
44-
///
45-
/// Defaults to a blob cache with 16 MiB of capacity *per value log*.
4640
#[must_use]
47-
pub fn blob_cache(mut self, blob_cache: Arc<BlobCache>) -> Self {
41+
pub fn blob_cache(mut self, blob_cache: BC) -> Self {
4842
self.blob_cache = blob_cache;
4943
self
5044
}

0 commit comments

Comments
 (0)