Skip to content

feat(storage): sepearte intra picker #12147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 90 additions & 44 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};

use super::picker::{
CompactionTaskValidator, SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
TtlReclaimCompactionPicker,
CompactionTaskValidator, IntraCompactionPicker, SpaceReclaimCompactionPicker,
SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker,
};
use super::{
create_compaction_task, LevelCompactionPicker, ManualCompactionOption, ManualCompactionPicker,
Expand All @@ -44,6 +44,23 @@ use crate::rpc::metrics::MetaMetrics;

pub const SCORE_BASE: u64 = 100;

#[derive(Debug, Default, Clone)]
pub enum PickerType {
Tier,
Intra,
ToBase,
#[default]
BottomLevel,
}

#[derive(Default, Debug)]
pub struct PickerInfo {
score: u64,
select_level: usize,
target_level: usize,
picker_type: PickerType,
}

pub trait LevelSelector: Sync + Send {
fn pick_compaction(
&mut self,
Expand Down Expand Up @@ -71,7 +88,7 @@ pub struct SelectContext {
// size of the files in `base_level` reaches its capacity, we will place data in a higher
// level, which equals to `base_level -= 1;`.
pub base_level: usize,
pub score_levels: Vec<(u64, usize, usize)>,
pub score_levels: Vec<PickerInfo>,
}

pub struct DynamicLevelSelectorCore {
Expand All @@ -92,33 +109,34 @@ impl DynamicLevelSelectorCore {

fn create_compaction_picker(
&self,
select_level: usize,
target_level: usize,
picker_info: &PickerInfo,
overlap_strategy: Arc<dyn OverlapStrategy>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> Box<dyn CompactionPicker> {
if select_level == 0 {
if target_level == 0 {
Box::new(TierCompactionPicker::new_with_validator(
self.config.clone(),
compaction_task_validator,
))
} else {
Box::new(LevelCompactionPicker::new_with_validator(
target_level,
self.config.clone(),
compaction_task_validator,
match picker_info.picker_type {
PickerType::Tier => Box::new(TierCompactionPicker::new_with_validator(
self.config.clone(),
compaction_task_validator,
)),
PickerType::ToBase => Box::new(LevelCompactionPicker::new_with_validator(
picker_info.target_level,
self.config.clone(),
compaction_task_validator,
)),
PickerType::Intra => Box::new(IntraCompactionPicker::new_with_validator(
self.config.clone(),
compaction_task_validator,
)),
PickerType::BottomLevel => {
assert_eq!(picker_info.select_level + 1, picker_info.target_level);
Box::new(MinOverlappingPicker::new(
picker_info.select_level,
picker_info.target_level,
self.config.max_bytes_for_level_base,
self.config.split_by_state_table,
overlap_strategy,
))
}
} else {
assert_eq!(select_level + 1, target_level);
Box::new(MinOverlappingPicker::new(
select_level,
target_level,
self.config.max_bytes_for_level_base,
self.config.split_by_state_table,
overlap_strategy,
))
}
}

Expand Down Expand Up @@ -217,8 +235,12 @@ impl DynamicLevelSelectorCore {
std::cmp::min(idle_file_count, overlapping_file_count) as u64 * SCORE_BASE
/ self.config.level0_tier_compact_file_number;
// Reduce the level num of l0 overlapping sub_level
ctx.score_levels
.push((std::cmp::max(l0_overlapping_score, SCORE_BASE + 1), 0, 0));
ctx.score_levels.push(PickerInfo {
score: std::cmp::max(l0_overlapping_score, SCORE_BASE + 1),
select_level: 0,
target_level: 0,
picker_type: PickerType::Tier,
})
}

// The read query at the non-overlapping level only selects ssts that match the query
Expand Down Expand Up @@ -254,8 +276,24 @@ impl DynamicLevelSelectorCore {
};

// Reduce the level num of l0 non-overlapping sub_level
ctx.score_levels
.push((non_overlapping_score, 0, ctx.base_level));
ctx.score_levels.push({
PickerInfo {
score: non_overlapping_score,
select_level: 0,
target_level: ctx.base_level,
picker_type: PickerType::ToBase,
}
});

// FIXME: more accurate score calculation algorithm will be introduced (#11903)
ctx.score_levels.push({
PickerInfo {
score: non_overlapping_score,
select_level: 0,
target_level: 0,
picker_type: PickerType::Intra,
}
});
}

// The bottommost level can not be input level.
Expand All @@ -275,16 +313,23 @@ impl DynamicLevelSelectorCore {
if total_size == 0 {
continue;
}
ctx.score_levels.push((
total_size * SCORE_BASE / ctx.level_max_bytes[level_idx],
level_idx,
level_idx + 1,
));

ctx.score_levels.push({
PickerInfo {
score: total_size * SCORE_BASE / ctx.level_max_bytes[level_idx],
select_level: level_idx,
target_level: level_idx + 1,
picker_type: PickerType::BottomLevel,
}
});
}

// sort reverse to pick the largest one.
ctx.score_levels
.sort_by(|a, b| b.0.cmp(&a.0).then_with(|| a.2.cmp(&b.2)));
ctx.score_levels.sort_by(|a, b| {
b.score
.cmp(&a.score)
.then_with(|| a.target_level.cmp(&b.target_level))
});
ctx
}

Expand Down Expand Up @@ -379,21 +424,20 @@ impl LevelSelector for DynamicLevelSelector {
let overlap_strategy =
create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);

// TODO: Determine which rule to enable by write limit
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
compaction_group.compaction_config.clone(),
));
for (score, select_level, target_level) in ctx.score_levels {
if score <= SCORE_BASE {
for picker_info in &ctx.score_levels {
if picker_info.score <= SCORE_BASE {
return None;
}
let mut picker = dynamic_level_core.create_compaction_picker(
select_level,
target_level,
picker_info,
overlap_strategy.clone(),
compaction_task_validator.clone(),
);

let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
ret.add_pending_task(task_id, level_handlers);
Expand All @@ -404,9 +448,11 @@ impl LevelSelector for DynamicLevelSelector {
self.task_type(),
));
}
selector_stats
.skip_picker
.push((select_level, target_level, stats));
selector_stats.skip_picker.push((
picker_info.select_level,
picker_info.target_level,
stats,
));
}
None
}
Expand Down
Loading