-
Notifications
You must be signed in to change notification settings - Fork 641
refactor(storage): seperate validator logic from picker #11984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
f000238
feat(storage): seperate validator logic from picker
Li0k 2d13fe4
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k b6c0c1c
refactor(storage): refactor input validator
Li0k 0a32009
refactor(storage): remove level type check of tier compaction picker
Li0k f7892e9
chore(storage): fix comments
Li0k 8a1be94
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k 8aec8f5
refactor(storage): refactor to reduce cpu calulaction
Li0k 64c7033
refactor(storage): refactor CompactionTaskValidatorRule
Li0k ccd22dc
fix(storage): fix picker skip metrics
Li0k d8bec2f
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k 11f262c
fix(storage): revert some base level compaction check
Li0k 5a35b3f
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k d097f64
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,18 @@ use risingwave_pb::hummock::hummock_version::Levels; | |
use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel}; | ||
|
||
use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; | ||
use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; | ||
use super::{ | ||
CompactionInput, CompactionPicker, CompactionTaskOptimizeRule, CompactionTaskValidator, | ||
LocalPickerStatistic, | ||
}; | ||
use crate::hummock::compaction::create_overlap_strategy; | ||
use crate::hummock::compaction::picker::TrivialMovePicker; | ||
use crate::hummock::level_handler::LevelHandler; | ||
|
||
pub struct LevelCompactionPicker { | ||
target_level: usize, | ||
config: Arc<CompactionConfig>, | ||
compaction_task_validator: Arc<CompactionTaskValidator>, | ||
} | ||
|
||
impl CompactionPicker for LevelCompactionPicker { | ||
|
@@ -84,13 +88,27 @@ impl CompactionPicker for LevelCompactionPicker { | |
} | ||
|
||
impl LevelCompactionPicker { | ||
#[cfg(test)] | ||
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> LevelCompactionPicker { | ||
LevelCompactionPicker { | ||
target_level, | ||
compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())), | ||
config, | ||
} | ||
} | ||
|
||
pub fn new_with_validator( | ||
target_level: usize, | ||
config: Arc<CompactionConfig>, | ||
compaction_task_validator: Arc<CompactionTaskValidator>, | ||
) -> LevelCompactionPicker { | ||
LevelCompactionPicker { | ||
target_level, | ||
config, | ||
compaction_task_validator, | ||
} | ||
} | ||
|
||
fn pick_base_trivial_move( | ||
&self, | ||
l0: &OverlappingLevel, | ||
|
@@ -117,21 +135,6 @@ impl LevelCompactionPicker { | |
level_handlers: &[LevelHandler], | ||
stats: &mut LocalPickerStatistic, | ||
) -> Option<CompactionInput> { | ||
let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size(); | ||
let base_level_size = target_level.total_file_size | ||
- level_handlers[target_level.level_idx as usize].get_pending_file_size(); | ||
|
||
if l0_size < base_level_size { | ||
stats.skip_by_write_amp_limit += 1; | ||
return None; | ||
} | ||
|
||
// no running base_compaction | ||
let strict_check = level_handlers[0] | ||
.get_pending_tasks() | ||
.iter() | ||
.any(|task| task.target_level != 0); | ||
|
||
let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); | ||
let min_compaction_bytes = self.config.sub_level_max_compaction_bytes; | ||
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( | ||
|
@@ -157,7 +160,7 @@ impl LevelCompactionPicker { | |
|
||
let mut skip_by_pending = false; | ||
let mut input_levels = vec![]; | ||
let mut min_write_amp_meet = false; | ||
|
||
for input in l0_select_tables_vec { | ||
let l0_select_tables = input | ||
.sstable_infos | ||
|
@@ -184,16 +187,6 @@ impl LevelCompactionPicker { | |
continue; | ||
} | ||
|
||
// The size of target level may be too large, we shall skip this compact task and wait | ||
// the data in base level compact to lower level. | ||
if target_level_size > self.config.max_compaction_bytes && strict_check { | ||
continue; | ||
} | ||
|
||
if input.total_file_size >= target_level_size { | ||
min_write_amp_meet = true; | ||
} | ||
|
||
input_levels.push((input, target_level_size, target_level_ssts)); | ||
} | ||
|
||
|
@@ -204,20 +197,7 @@ impl LevelCompactionPicker { | |
return None; | ||
} | ||
|
||
if !min_write_amp_meet && strict_check { | ||
// If the write-amplification of all candidate task are large, we may hope to wait base | ||
// level compact more data to lower level. But if we skip all task, I'm | ||
// afraid the data will be blocked in level0 and will be never compacted to base level. | ||
// So we only allow one task exceed write-amplification-limit running in | ||
// level0 to base-level. | ||
return None; | ||
} | ||
|
||
for (input, target_file_size, target_level_files) in input_levels { | ||
if min_write_amp_meet && input.total_file_size < target_file_size { | ||
continue; | ||
} | ||
|
||
for (input, _target_file_size, target_level_files) in input_levels { | ||
let mut select_level_inputs = input | ||
.sstable_infos | ||
.into_iter() | ||
|
@@ -233,11 +213,22 @@ impl LevelCompactionPicker { | |
level_type: target_level.level_type, | ||
table_infos: target_level_files, | ||
}); | ||
return Some(CompactionInput { | ||
|
||
let result = CompactionInput { | ||
input_levels: select_level_inputs, | ||
target_level: self.target_level, | ||
target_sub_level_id: 0, | ||
}); | ||
}; | ||
|
||
if !self.compaction_task_validator.valid_compact_task( | ||
&result, | ||
CompactionTaskOptimizeRule::ToBase, | ||
stats, | ||
) { | ||
continue; | ||
} | ||
|
||
return Some(result); | ||
} | ||
stats.skip_by_write_amp_limit += 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need to update the stats here? |
||
None | ||
|
@@ -267,8 +258,6 @@ impl LevelCompactionPicker { | |
self.config.sub_level_max_compaction_bytes, | ||
); | ||
|
||
let tier_sub_level_compact_level_count = | ||
self.config.level0_sub_level_compact_level_count as usize; | ||
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( | ||
self.config.sub_level_max_compaction_bytes / 2, | ||
max_compaction_bytes, | ||
|
@@ -284,18 +273,8 @@ impl LevelCompactionPicker { | |
continue; | ||
} | ||
|
||
let mut skip_by_write_amp = false; | ||
// Limit the number of selection levels for the non-overlapping | ||
// sub_level at least level0_sub_level_compact_level_count | ||
for (plan_index, input) in l0_select_tables_vec.into_iter().enumerate() { | ||
if plan_index == 0 | ||
&& input.sstable_infos.len() | ||
< self.config.level0_sub_level_compact_level_count as usize | ||
{ | ||
// first plan level count smaller than limit | ||
break; | ||
} | ||
|
||
let validator = CompactionTaskValidator::new(self.config.clone()); | ||
for input in l0_select_tables_vec { | ||
let mut max_level_size = 0; | ||
for level_select_table in &input.sstable_infos { | ||
let level_select_size = level_select_table | ||
|
@@ -306,22 +285,6 @@ impl LevelCompactionPicker { | |
max_level_size = std::cmp::max(max_level_size, level_select_size); | ||
} | ||
|
||
// This limitation would keep our write-amplification no more than | ||
// ln(max_compaction_bytes/flush_level_bytes) / | ||
// ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half | ||
// of level0_sub_level_compact_level_count just for convenient. | ||
let is_write_amp_large = | ||
max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2 | ||
>= input.total_file_size; | ||
|
||
if (is_write_amp_large | ||
|| input.sstable_infos.len() < tier_sub_level_compact_level_count) | ||
&& input.total_file_count < self.config.level0_max_compact_file_number as usize | ||
{ | ||
skip_by_write_amp = true; | ||
continue; | ||
} | ||
|
||
let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len()); | ||
for level_select_sst in input.sstable_infos { | ||
if level_select_sst.is_empty() { | ||
|
@@ -334,15 +297,19 @@ impl LevelCompactionPicker { | |
}); | ||
} | ||
select_level_inputs.reverse(); | ||
return Some(CompactionInput { | ||
|
||
let result = CompactionInput { | ||
input_levels: select_level_inputs, | ||
target_level: 0, | ||
target_sub_level_id: level.sub_level_id, | ||
}); | ||
} | ||
}; | ||
|
||
if skip_by_write_amp { | ||
stats.skip_by_write_amp_limit += 1; | ||
if !validator.valid_compact_task(&result, CompactionTaskOptimizeRule::Intra, stats) | ||
{ | ||
continue; | ||
} | ||
|
||
return Some(result); | ||
} | ||
} | ||
|
||
|
@@ -918,7 +885,6 @@ pub mod tests { | |
// But stopped by pending sub-level when trying to include more sub-levels. | ||
let mut picker = LevelCompactionPicker::new(1, config.clone()); | ||
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); | ||
|
||
assert!(ret.is_none()); | ||
|
||
// Free the pending sub-level. | ||
|
@@ -964,7 +930,6 @@ pub mod tests { | |
let ret = picker | ||
.pick_compaction(&levels, &levels_handler, &mut local_stats) | ||
.unwrap(); | ||
// println!("ret.input_levels: {:?}", ret.input_levels); | ||
// 1. trivial_move | ||
assert_eq!(2, ret.input_levels.len()); | ||
assert!(ret.input_levels[1].table_infos.is_empty()); | ||
|
@@ -974,7 +939,6 @@ pub mod tests { | |
let ret = picker | ||
.pick_compaction(&levels, &levels_handler, &mut local_stats) | ||
.unwrap(); | ||
println!("ret.input_levels: {:?}", ret.input_levels); | ||
assert_eq!(3, ret.input_levels.len()); | ||
assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this effect the strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may affect performance, but there is no correctness issue and I wanted to prioritize simplifying the code.
And will there be a new base level picker strategy?