Skip to content

Commit 2a90f2a

Browse files
authored
Merge pull request #118 from fjall-rs/fix/lockless-ranges
2.7.2
2 parents b0e10fc + 08caf84 commit 2a90f2a

File tree

17 files changed

+175
-93
lines changed

17 files changed

+175
-93
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "lsm-tree"
33
description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)"
44
license = "MIT OR Apache-2.0"
5-
version = "2.7.1"
5+
version = "2.7.2"
66
edition = "2021"
77
rust-version = "1.75.0"
88
readme = "README.md"

src/abstract.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ pub trait AbstractTree {
4747
fn register_segments(&self, segments: &[Segment]) -> crate::Result<()>;
4848

4949
/// Write-locks the active memtable for exclusive access
50-
fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Memtable>;
50+
fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>>;
51+
52+
/// Clears the active memtable atomically.
53+
fn clear_active_memtable(&self);
5154

5255
/// Sets the active memtable.
5356
///

src/blob_tree/gc/reader.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,24 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{blob_tree::value::MaybeInlineValue, coding::Decode, Memtable};
6-
use std::{io::Cursor, sync::RwLockWriteGuard};
6+
use std::io::Cursor;
77
use value_log::ValueHandle;
88

99
#[allow(clippy::module_name_repetitions)]
1010
pub struct GcReader<'a> {
1111
tree: &'a crate::Tree,
12-
memtable: &'a RwLockWriteGuard<'a, Memtable>,
12+
memtable: &'a Memtable,
1313
}
1414

1515
impl<'a> GcReader<'a> {
16-
pub fn new(tree: &'a crate::Tree, memtable: &'a RwLockWriteGuard<'a, Memtable>) -> Self {
16+
pub fn new(tree: &'a crate::Tree, memtable: &'a Memtable) -> Self {
1717
Self { tree, memtable }
1818
}
1919

2020
fn get_internal(&self, key: &[u8]) -> crate::Result<Option<MaybeInlineValue>> {
2121
let Some(item) = self
2222
.tree
23-
.get_internal_entry_with_lock(self.memtable, key, None)?
23+
.get_internal_entry_with_memtable(self.memtable, key, None)?
2424
.map(|x| x.value)
2525
else {
2626
return Ok(None);

src/blob_tree/gc/writer.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@ use crate::{
66
blob_tree::value::MaybeInlineValue, coding::Encode, value::InternalValue, Memtable, SeqNo,
77
UserKey,
88
};
9-
use std::sync::RwLockWriteGuard;
109
use value_log::ValueHandle;
1110

1211
#[allow(clippy::module_name_repetitions)]
1312
pub struct GcWriter<'a> {
1413
seqno: SeqNo,
1514
buffer: Vec<(UserKey, ValueHandle, u32)>,
16-
memtable: &'a RwLockWriteGuard<'a, Memtable>,
15+
memtable: &'a Memtable,
1716
}
1817

1918
impl<'a> GcWriter<'a> {
20-
pub fn new(seqno: SeqNo, memtable: &'a RwLockWriteGuard<'a, Memtable>) -> Self {
19+
pub fn new(seqno: SeqNo, memtable: &'a Memtable) -> Self {
2120
Self {
2221
seqno,
2322
memtable,

src/blob_tree/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ impl BlobTree {
230230
}
231231

232232
impl AbstractTree for BlobTree {
233+
fn clear_active_memtable(&self) {
234+
self.index.clear_active_memtable();
235+
}
236+
233237
fn l0_run_count(&self) -> usize {
234238
self.index.l0_run_count()
235239
}
@@ -411,7 +415,7 @@ impl AbstractTree for BlobTree {
411415
Ok(())
412416
}
413417

414-
fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Memtable> {
418+
fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
415419
self.index.lock_active_memtable()
416420
}
417421

src/compaction/fifo.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ mod tests {
133133
time::unix_timestamp,
134134
HashSet, KeyRange,
135135
};
136-
use std::sync::Arc;
136+
use std::sync::{atomic::AtomicBool, Arc};
137137
use test_log::test;
138138

139139
#[allow(clippy::expect_used)]
@@ -180,6 +180,9 @@ mod tests {
180180
block_cache,
181181

182182
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
183+
184+
path: "a".into(),
185+
is_deleted: AtomicBool::default(),
183186
}
184187
.into()
185188
}

src/compaction/leveled.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,10 @@ mod tests {
397397
time::unix_timestamp,
398398
Config, HashSet, KeyRange,
399399
};
400-
use std::{path::Path, sync::Arc};
400+
use std::{
401+
path::Path,
402+
sync::{atomic::AtomicBool, Arc},
403+
};
401404
use test_log::test;
402405

403406
fn string_key_range(a: &str, b: &str) -> KeyRange {
@@ -456,6 +459,9 @@ mod tests {
456459
block_cache,
457460

458461
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
462+
463+
path: "a".into(),
464+
is_deleted: AtomicBool::default(),
459465
}
460466
.into()
461467
}

src/compaction/maintenance.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ mod tests {
9797
},
9898
KeyRange,
9999
};
100-
use std::sync::Arc;
100+
use std::sync::{atomic::AtomicBool, Arc};
101101
use test_log::test;
102102

103103
#[allow(clippy::expect_used)]
@@ -143,6 +143,9 @@ mod tests {
143143
block_cache,
144144

145145
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
146+
147+
path: "a".into(),
148+
is_deleted: AtomicBool::default(),
146149
}
147150
.into()
148151
}

src/compaction/tiered.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ mod tests {
168168
},
169169
HashSet, KeyRange, SeqNo,
170170
};
171-
use std::sync::Arc;
171+
use std::sync::{atomic::AtomicBool, Arc};
172172
use test_log::test;
173173

174174
#[allow(clippy::expect_used)]
@@ -214,6 +214,9 @@ mod tests {
214214
block_cache,
215215

216216
bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
217+
218+
path: "a".into(),
219+
is_deleted: AtomicBool::default(),
217220
}
218221
.into()
219222
}

src/compaction/worker.rs

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use crate::{
2424
};
2525
use std::{
2626
path::Path,
27-
sync::{atomic::AtomicU64, Arc, RwLock, RwLockWriteGuard},
27+
sync::{
28+
atomic::{AtomicBool, AtomicU64},
29+
Arc, RwLock, RwLockWriteGuard,
30+
},
2831
time::Instant,
2932
};
3033

@@ -211,6 +214,19 @@ fn merge_segments(
211214
return Ok(());
212215
}
213216

217+
let Some(segments) = payload
218+
.segment_ids
219+
.iter()
220+
.map(|&id| levels.get_segment(id))
221+
.collect::<Option<Vec<_>>>()
222+
else {
223+
log::warn!(
224+
"Compaction task created by {:?} contained segments not referenced in the level manifest",
225+
opts.strategy.get_name(),
226+
);
227+
return Ok(());
228+
};
229+
214230
let segments_base_folder = opts.config.path.join(SEGMENTS_FOLDER);
215231

216232
log::debug!(
@@ -381,6 +397,8 @@ fn merge_segments(
381397
let block_index = Arc::new(block_index);
382398

383399
Ok(SegmentInner {
400+
path: segment_file_path.to_path_buf(),
401+
384402
tree_id: opts.tree_id,
385403

386404
descriptor_table: opts.config.descriptor_table.clone(),
@@ -393,6 +411,8 @@ fn merge_segments(
393411
block_index,
394412

395413
bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
414+
415+
is_deleted: AtomicBool::default(),
396416
}
397417
.into())
398418
})
@@ -412,11 +432,7 @@ fn merge_segments(
412432
// NOTE: Mind lock order L -> M -> S
413433
log::trace!("compactor: acquiring levels manifest write lock");
414434
let mut levels = opts.levels.write().expect("lock is poisoned");
415-
416-
// IMPORTANT: Write lock memtable(s), otherwise segments may get deleted while a range read is happening
417-
// NOTE: Mind lock order L -> M -> S
418-
log::trace!("compactor: acquiring sealed memtables write lock");
419-
let sealed_memtables_guard = opts.sealed_memtables.write().expect("lock is poisoned");
435+
log::trace!("compactor: acquired levels manifest write lock");
420436

421437
let swap_result = levels.atomic_swap(|recipe| {
422438
for segment in created_segments.iter().cloned() {
@@ -451,19 +467,11 @@ fn merge_segments(
451467
.insert(&segment_file_path, segment.global_id());
452468
}
453469

454-
// NOTE: Segments are registered, we can unlock the memtable(s) safely
455-
drop(sealed_memtables_guard);
456-
457470
// NOTE: If the application were to crash >here< it's fine
458471
// The segments are not referenced anymore, and will be
459472
// cleaned up upon recovery
460-
for segment_id in &payload.segment_ids {
461-
let segment_file_path = segments_base_folder.join(segment_id.to_string());
462-
log::trace!("Removing old segment at {segment_file_path:?}");
463-
464-
if let Err(e) = std::fs::remove_file(segment_file_path) {
465-
log::error!("Failed to cleanup file of deleted segment: {e:?}");
466-
}
473+
for segment in segments {
474+
segment.mark_as_deleted();
467475
}
468476

469477
// NOTE: Unlock level manifest before clearing old file descriptors
@@ -476,17 +484,6 @@ fn merge_segments(
476484
levels.show_segments(payload.segment_ids.iter().copied());
477485
drop(levels);
478486

479-
log::trace!(
480-
"Closing file handles for old segment files: {:?}",
481-
payload.segment_ids
482-
);
483-
484-
for segment_id in &payload.segment_ids {
485-
opts.config
486-
.descriptor_table
487-
.remove((opts.tree_id, *segment_id).into());
488-
}
489-
490487
log::trace!("Compaction successful");
491488

492489
Ok(())
@@ -506,11 +503,17 @@ fn drop_segments(
506503
return Ok(());
507504
}
508505

509-
let segments_base_folder = opts.config.path.join(SEGMENTS_FOLDER);
510-
511-
// IMPORTANT: Write lock memtable, otherwise segments may get deleted while a range read is happening
512-
log::trace!("Acquiring sealed memtables write lock");
513-
let memtable_lock = opts.sealed_memtables.write().expect("lock is poisoned");
506+
let Some(segments) = segment_ids
507+
.iter()
508+
.map(|id| levels.get_segment(id.segment_id()))
509+
.collect::<Option<Vec<_>>>()
510+
else {
511+
log::warn!(
512+
"Compaction task created by {:?} contained segments not referenced in the level manifest",
513+
opts.strategy.get_name(),
514+
);
515+
return Ok(());
516+
};
514517

515518
// IMPORTANT: Write the segment with the removed segments first
516519
// Otherwise the folder is deleted, but the segment is still referenced!
@@ -525,26 +528,13 @@ fn drop_segments(
525528
}
526529
})?;
527530

528-
drop(memtable_lock);
529531
drop(levels);
530532

531533
// NOTE: If the application were to crash >here< it's fine
532534
// The segments are not referenced anymore, and will be
533535
// cleaned up upon recovery
534-
for key in segment_ids {
535-
let segment_id = key.segment_id();
536-
537-
let segment_file_path = segments_base_folder.join(segment_id.to_string());
538-
log::trace!("Removing old segment at {segment_file_path:?}");
539-
540-
if let Err(e) = std::fs::remove_file(segment_file_path) {
541-
log::error!("Failed to cleanup file of deleted segment: {e:?}");
542-
}
543-
}
544-
545-
for key in segment_ids {
546-
log::trace!("Closing file handles for segment data file");
547-
opts.config.descriptor_table.remove(*key);
536+
for segment in segments {
537+
segment.mark_as_deleted();
548538
}
549539

550540
log::trace!("Dropped {} segments", segment_ids.len());

src/level_manifest/level.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ mod tests {
257257
},
258258
AbstractTree, KeyRange, Slice,
259259
};
260-
use std::sync::Arc;
260+
use std::sync::{atomic::AtomicBool, Arc};
261261
use test_log::test;
262262

263263
#[allow(clippy::expect_used)]
@@ -303,6 +303,9 @@ mod tests {
303303
block_cache,
304304

305305
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
306+
307+
path: "a".into(),
308+
is_deleted: AtomicBool::default(),
306309
}
307310
.into()
308311
}

src/level_manifest/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,15 @@ impl LevelManifest {
378378
output
379379
}
380380

381+
pub(crate) fn get_segment(&self, id: SegmentId) -> Option<Segment> {
382+
for level in &self.levels {
383+
if let Some(segment) = level.segments.iter().find(|x| x.id() == id).cloned() {
384+
return Some(segment);
385+
}
386+
}
387+
None
388+
}
389+
381390
/// Returns a view into the levels, hiding all segments that currently are being compacted
382391
#[must_use]
383392
pub fn resolved_view(&self) -> Vec<Level> {

0 commit comments

Comments
 (0)