Skip to content

Commit cfc0349

Browse files
perf(compaction): avoid duplicate data in LSM (risingwavelabs#8489)
1 parent ad7e21b commit cfc0349

File tree

4 files changed

+118
-39
lines changed

4 files changed

+118
-39
lines changed

src/storage/src/hummock/compactor/iterator.rs

Lines changed: 85 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ struct SstableStreamIterator {
4343
/// Counts the time used for IO.
4444
stats_ptr: Arc<AtomicU64>,
4545

46-
// For debugging
46+
/// For key sanity check of divided SST and debugging
4747
sstable_info: SstableInfo,
4848
}
4949

@@ -77,6 +77,22 @@ impl SstableStreamIterator {
7777
}
7878
}
7979

80+
async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
81+
while let Some(block_iter) = self.block_iter.as_mut() {
82+
if self
83+
.sstable_info
84+
.get_table_ids()
85+
.binary_search(&block_iter.table_id().table_id)
86+
.is_ok()
87+
{
88+
return Ok(());
89+
} else {
90+
self.next_block().await?;
91+
}
92+
}
93+
Ok(())
94+
}
95+
8096
/// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
8197
/// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
8298
/// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
@@ -98,7 +114,7 @@ impl SstableStreamIterator {
98114
}
99115
}
100116

101-
Ok(())
117+
self.prune_from_valid_block_iter().await
102118
}
103119

104120
/// Loads a new block, creates a new iterator for it, and stores that iterator in
@@ -147,6 +163,7 @@ impl SstableStreamIterator {
147163
block_iter.next();
148164
if !block_iter.is_valid() {
149165
self.next_block().await?;
166+
self.prune_from_valid_block_iter().await?;
150167
}
151168

152169
Ok(())
@@ -226,11 +243,12 @@ impl ConcatSstableIterator {
226243
/// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
227244
async fn seek_idx(
228245
&mut self,
229-
idx: usize,
246+
mut idx: usize,
230247
seek_key: Option<FullKey<&[u8]>>,
231248
) -> HummockResult<()> {
232249
self.sstable_iter.take();
233-
let seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty()) {
250+
let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
251+
{
234252
(Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
235253
Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
236254
Ordering::Greater => Some(seek_key),
@@ -240,14 +258,14 @@ impl ConcatSstableIterator {
240258
(None, false) => Some(FullKey::decode(&self.key_range.left)),
241259
};
242260

243-
if idx < self.tables.len() {
261+
while idx < self.tables.len() {
244262
let table_info = &self.tables[idx];
245263
let table = self
246264
.sstable_store
247265
.sstable(table_info, &mut self.stats)
248266
.await?;
249267
let block_metas = &table.value().meta.block_metas;
250-
let start_index = match seek_key {
268+
let mut start_index = match seek_key {
251269
None => 0,
252270
Some(seek_key) => {
253271
// start_index points to the greatest block whose smallest_key <= seek_key.
@@ -268,32 +286,61 @@ impl ConcatSstableIterator {
268286
) != Ordering::Greater
269287
})
270288
};
271-
if end_index <= start_index {
272-
return Ok(());
273-
}
274-
275-
let stats_ptr = self.stats.remote_io_time.clone();
276-
let now = Instant::now();
277-
278-
let block_stream = self
279-
.sstable_store
280-
.get_stream(table.value(), Some(start_index))
281-
.await?;
282-
283-
// Determine time needed to open stream.
284-
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
285-
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
286289

287-
let mut sstable_iter = SstableStreamIterator::new(
288-
table_info,
289-
block_stream,
290-
end_index - start_index,
291-
&self.stats,
292-
);
293-
sstable_iter.seek(seek_key).await?;
290+
while start_index < end_index {
291+
let start_block_table_id = block_metas[start_index].table_id();
292+
if table_info
293+
.get_table_ids()
294+
.binary_search(&start_block_table_id.table_id)
295+
.is_ok()
296+
{
297+
break;
298+
} else {
299+
start_index +=
300+
&block_metas[(start_index + 1)..].partition_point(|block_meta| {
301+
block_meta.table_id() == start_block_table_id
302+
}) + 1;
303+
}
304+
}
294305

295-
self.sstable_iter = Some(sstable_iter);
306+
let found = if end_index <= start_index {
307+
false
308+
} else {
309+
let stats_ptr = self.stats.remote_io_time.clone();
310+
let now = Instant::now();
311+
312+
let block_stream = self
313+
.sstable_store
314+
.get_stream(table.value(), Some(start_index))
315+
.await?;
316+
317+
// Determine time needed to open stream.
318+
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
319+
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
320+
321+
let mut sstable_iter = SstableStreamIterator::new(
322+
table_info,
323+
block_stream,
324+
end_index - start_index,
325+
&self.stats,
326+
);
327+
sstable_iter.seek(seek_key).await?;
328+
329+
if sstable_iter.is_valid() {
330+
self.sstable_iter = Some(sstable_iter);
331+
true
332+
} else {
333+
false
334+
}
335+
};
296336
self.cur_idx = idx;
337+
338+
if found {
339+
return Ok(());
340+
} else {
341+
idx += 1;
342+
seek_key = None;
343+
}
297344
}
298345
Ok(())
299346
}
@@ -383,7 +430,8 @@ mod tests {
383430
use crate::hummock::iterator::test_utils::mock_sstable_store;
384431
use crate::hummock::iterator::HummockIterator;
385432
use crate::hummock::test_utils::{
386-
default_builder_opt_for_test, gen_test_sstable, test_key_of, test_value_of, TEST_KEYS_COUNT,
433+
default_builder_opt_for_test, gen_test_sstable_and_info, test_key_of, test_value_of,
434+
TEST_KEYS_COUNT,
387435
};
388436
use crate::hummock::value::HummockValue;
389437

@@ -394,15 +442,15 @@ mod tests {
394442
for object_id in 0..3 {
395443
let start_index = object_id * TEST_KEYS_COUNT;
396444
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
397-
let table = gen_test_sstable(
445+
let (_table, table_info) = gen_test_sstable_and_info(
398446
default_builder_opt_for_test(),
399447
object_id as u64,
400448
(start_index..end_index)
401449
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
402450
sstable_store.clone(),
403451
)
404452
.await;
405-
table_infos.push(table.get_sstable_info());
453+
table_infos.push(table_info);
406454
}
407455
let start_index = 5000;
408456
let end_index = 25000;
@@ -494,15 +542,15 @@ mod tests {
494542
for object_id in 0..3 {
495543
let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
496544
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
497-
let table = gen_test_sstable(
545+
let (_table, table_info) = gen_test_sstable_and_info(
498546
default_builder_opt_for_test(),
499547
object_id as u64,
500548
(start_index..end_index)
501549
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
502550
sstable_store.clone(),
503551
)
504552
.await;
505-
table_infos.push(table.get_sstable_info());
553+
table_infos.push(table_info);
506554
}
507555

508556
// Test seek_idx. Result is dominated by given seek key rather than key range.
@@ -536,7 +584,9 @@ mod tests {
536584
let block_1_second_key = iter.key().to_vec();
537585
// Use a big enough seek key and result in invalid iterator.
538586
let seek_key = test_key_of(30001);
539-
iter.seek_idx(0, Some(seek_key.to_ref())).await.unwrap();
587+
iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
588+
.await
589+
.unwrap();
540590
assert!(!iter.is_valid());
541591

542592
// Test seek_idx. Result is dominated by key range rather than given seek key.

src/storage/src/hummock/sstable/block_iterator.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::cmp::Ordering;
1616
use std::ops::Range;
1717

1818
use bytes::BytesMut;
19+
use risingwave_common::catalog::TableId;
1920
use risingwave_hummock_sdk::key::FullKey;
2021

2122
use super::{KeyPrefix, LenType, RestartPoint};
@@ -74,10 +75,14 @@ impl BlockIterator {
7475
self.try_prev_inner()
7576
}
7677

78+
pub fn table_id(&self) -> TableId {
79+
self.block.table_id()
80+
}
81+
7782
pub fn key(&self) -> FullKey<&[u8]> {
7883
assert!(self.is_valid());
7984

80-
FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..])
85+
FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
8186
}
8287

8388
pub fn value(&self) -> &[u8] {

src/storage/src/hummock/sstable/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use bytes::{Buf, BufMut};
4040
pub use forward_sstable_iterator::*;
4141
mod backward_sstable_iterator;
4242
pub use backward_sstable_iterator::*;
43-
use risingwave_hummock_sdk::key::{KeyPayloadType, TableKey, UserKey};
43+
use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey};
4444
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
4545
#[cfg(test)]
4646
use risingwave_pb::hummock::{KeyRange, SstableInfo};
@@ -253,6 +253,10 @@ impl BlockMeta {
253253
pub fn encoded_size(&self) -> usize {
254254
16 /* offset + len + key len + uncompressed size */ + self.smallest_key.len()
255255
}
256+
257+
pub fn table_id(&self) -> TableId {
258+
FullKey::decode(&self.smallest_key).user_key.table_id
259+
}
256260
}
257261

258262
#[derive(Clone, PartialEq, Eq, Debug)]

src/storage/src/hummock/test_utils.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ pub async fn gen_test_sstable_inner<B: AsRef<[u8]>>(
204204
range_tombstones: Vec<DeleteRangeTombstone>,
205205
sstable_store: SstableStoreRef,
206206
policy: CachePolicy,
207-
) -> Sstable {
207+
) -> (Sstable, SstableInfo) {
208208
let writer_opts = SstableWriterOptions {
209209
capacity_hint: None,
210210
tracker: None,
@@ -227,7 +227,7 @@ pub async fn gen_test_sstable_inner<B: AsRef<[u8]>>(
227227
)
228228
.await
229229
.unwrap();
230-
table.value().as_ref().clone()
230+
(table.value().as_ref().clone(), output.sst_info.sst_info)
231231
}
232232

233233
/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
@@ -246,6 +246,25 @@ pub async fn gen_test_sstable<B: AsRef<[u8]>>(
246246
CachePolicy::NotFill,
247247
)
248248
.await
249+
.0
250+
}
251+
252+
/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
253+
pub async fn gen_test_sstable_and_info<B: AsRef<[u8]>>(
254+
opts: SstableBuilderOptions,
255+
object_id: HummockSstableObjectId,
256+
kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
257+
sstable_store: SstableStoreRef,
258+
) -> (Sstable, SstableInfo) {
259+
gen_test_sstable_inner(
260+
opts,
261+
object_id,
262+
kv_iter,
263+
vec![],
264+
sstable_store,
265+
CachePolicy::NotFill,
266+
)
267+
.await
249268
}
250269

251270
/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
@@ -265,6 +284,7 @@ pub async fn gen_test_sstable_with_range_tombstone(
265284
CachePolicy::NotFill,
266285
)
267286
.await
287+
.0
268288
}
269289

270290
/// Generates a user key with table id 0 and the given `table_key`

0 commit comments

Comments
 (0)