Skip to content

refactor(storage): seperate validator logic from picker #11984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};

use super::picker::{
SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
CompactionTaskValidator, SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
TtlReclaimCompactionPicker,
};
use super::{
Expand Down Expand Up @@ -95,14 +95,19 @@ impl DynamicLevelSelectorCore {
select_level: usize,
target_level: usize,
overlap_strategy: Arc<dyn OverlapStrategy>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> Box<dyn CompactionPicker> {
if select_level == 0 {
if target_level == 0 {
Box::new(TierCompactionPicker::new(self.config.clone()))
Box::new(TierCompactionPicker::new_with_validator(
self.config.clone(),
compaction_task_validator,
))
} else {
Box::new(LevelCompactionPicker::new(
Box::new(LevelCompactionPicker::new_with_validator(
target_level,
self.config.clone(),
compaction_task_validator,
))
}
} else {
Expand Down Expand Up @@ -374,6 +379,11 @@ impl LevelSelector for DynamicLevelSelector {
let overlap_strategy =
create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);

// TODO: Determine which rule to enable by write limit
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
compaction_group.compaction_config.clone(),
));
for (score, select_level, target_level) in ctx.score_levels {
if score <= SCORE_BASE {
return None;
Expand All @@ -382,6 +392,7 @@ impl LevelSelector for DynamicLevelSelector {
select_level,
target_level,
overlap_strategy.clone(),
compaction_task_validator.clone(),
);
let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
Expand Down
126 changes: 45 additions & 81 deletions src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel};

use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
use super::{
CompactionInput, CompactionPicker, CompactionTaskOptimizeRule, CompactionTaskValidator,
LocalPickerStatistic,
};
use crate::hummock::compaction::create_overlap_strategy;
use crate::hummock::compaction::picker::TrivialMovePicker;
use crate::hummock::level_handler::LevelHandler;

pub struct LevelCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
}

impl CompactionPicker for LevelCompactionPicker {
Expand Down Expand Up @@ -84,13 +88,27 @@ impl CompactionPicker for LevelCompactionPicker {
}

impl LevelCompactionPicker {
#[cfg(test)]
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
config,
}
}

pub fn new_with_validator(
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
config,
compaction_task_validator,
}
}

fn pick_base_trivial_move(
&self,
l0: &OverlappingLevel,
Expand All @@ -117,21 +135,6 @@ impl LevelCompactionPicker {
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this effect the strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may affect performance, but there is no correctness issue and I wanted to prioritize simplifying the code.

And will there be a new base level picker strategy?

let base_level_size = target_level.total_file_size
- level_handlers[target_level.level_idx as usize].get_pending_file_size();

if l0_size < base_level_size {
stats.skip_by_write_amp_limit += 1;
return None;
}

// no running base_compaction
let strict_check = level_handlers[0]
.get_pending_tasks()
.iter()
.any(|task| task.target_level != 0);

let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
Expand All @@ -157,7 +160,7 @@ impl LevelCompactionPicker {

let mut skip_by_pending = false;
let mut input_levels = vec![];
let mut min_write_amp_meet = false;

for input in l0_select_tables_vec {
let l0_select_tables = input
.sstable_infos
Expand All @@ -184,16 +187,6 @@ impl LevelCompactionPicker {
continue;
}

// The size of target level may be too large, we shall skip this compact task and wait
// the data in base level compact to lower level.
if target_level_size > self.config.max_compaction_bytes && strict_check {
continue;
}

if input.total_file_size >= target_level_size {
min_write_amp_meet = true;
}

input_levels.push((input, target_level_size, target_level_ssts));
}

Expand All @@ -204,20 +197,7 @@ impl LevelCompactionPicker {
return None;
}

if !min_write_amp_meet && strict_check {
// If the write-amplification of all candidate task are large, we may hope to wait base
// level compact more data to lower level. But if we skip all task, I'm
// afraid the data will be blocked in level0 and will be never compacted to base level.
// So we only allow one task exceed write-amplification-limit running in
// level0 to base-level.
return None;
}

for (input, target_file_size, target_level_files) in input_levels {
if min_write_amp_meet && input.total_file_size < target_file_size {
continue;
}

for (input, _target_file_size, target_level_files) in input_levels {
let mut select_level_inputs = input
.sstable_infos
.into_iter()
Expand All @@ -233,11 +213,22 @@ impl LevelCompactionPicker {
level_type: target_level.level_type,
table_infos: target_level_files,
});
return Some(CompactionInput {

let result = CompactionInput {
input_levels: select_level_inputs,
target_level: self.target_level,
target_sub_level_id: 0,
});
};

if !self.compaction_task_validator.valid_compact_task(
&result,
CompactionTaskOptimizeRule::ToBase,
stats,
) {
continue;
}

return Some(result);
}
stats.skip_by_write_amp_limit += 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to update the stats here?

None
Expand Down Expand Up @@ -267,8 +258,6 @@ impl LevelCompactionPicker {
self.config.sub_level_max_compaction_bytes,
);

let tier_sub_level_compact_level_count =
self.config.level0_sub_level_compact_level_count as usize;
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
self.config.sub_level_max_compaction_bytes / 2,
max_compaction_bytes,
Expand All @@ -284,18 +273,8 @@ impl LevelCompactionPicker {
continue;
}

let mut skip_by_write_amp = false;
// Limit the number of selection levels for the non-overlapping
// sub_level at least level0_sub_level_compact_level_count
for (plan_index, input) in l0_select_tables_vec.into_iter().enumerate() {
if plan_index == 0
&& input.sstable_infos.len()
< self.config.level0_sub_level_compact_level_count as usize
{
// first plan level count smaller than limit
break;
}

let validator = CompactionTaskValidator::new(self.config.clone());
for input in l0_select_tables_vec {
let mut max_level_size = 0;
for level_select_table in &input.sstable_infos {
let level_select_size = level_select_table
Expand All @@ -306,22 +285,6 @@ impl LevelCompactionPicker {
max_level_size = std::cmp::max(max_level_size, level_select_size);
}

// This limitation would keep our write-amplification no more than
// ln(max_compaction_bytes/flush_level_bytes) /
// ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half
// of level0_sub_level_compact_level_count just for convenient.
let is_write_amp_large =
max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
>= input.total_file_size;

if (is_write_amp_large
|| input.sstable_infos.len() < tier_sub_level_compact_level_count)
&& input.total_file_count < self.config.level0_max_compact_file_number as usize
{
skip_by_write_amp = true;
continue;
}

let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
for level_select_sst in input.sstable_infos {
if level_select_sst.is_empty() {
Expand All @@ -334,15 +297,19 @@ impl LevelCompactionPicker {
});
}
select_level_inputs.reverse();
return Some(CompactionInput {

let result = CompactionInput {
input_levels: select_level_inputs,
target_level: 0,
target_sub_level_id: level.sub_level_id,
});
}
};

if skip_by_write_amp {
stats.skip_by_write_amp_limit += 1;
if !validator.valid_compact_task(&result, CompactionTaskOptimizeRule::Intra, stats)
{
continue;
}

return Some(result);
}
}

Expand Down Expand Up @@ -918,7 +885,6 @@ pub mod tests {
// But stopped by pending sub-level when trying to include more sub-levels.
let mut picker = LevelCompactionPicker::new(1, config.clone());
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);

assert!(ret.is_none());

// Free the pending sub-level.
Expand Down Expand Up @@ -964,7 +930,6 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
// println!("ret.input_levels: {:?}", ret.input_levels);
// 1. trivial_move
assert_eq!(2, ret.input_levels.len());
assert!(ret.input_levels[1].table_infos.is_empty());
Expand All @@ -974,7 +939,6 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
println!("ret.input_levels: {:?}", ret.input_levels);
assert_eq!(3, ret.input_levels.len());
assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
}
Expand Down
Loading