diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index b6a3fc1a57b5c..0edfeb0e5e130 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -26,8 +26,8 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType}; use super::picker::{ - CompactionTaskValidator, SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState, - TtlReclaimCompactionPicker, + CompactionTaskValidator, IntraCompactionPicker, SpaceReclaimCompactionPicker, + SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker, }; use super::{ create_compaction_task, LevelCompactionPicker, ManualCompactionOption, ManualCompactionPicker, @@ -44,6 +44,23 @@ use crate::rpc::metrics::MetaMetrics; pub const SCORE_BASE: u64 = 100; +#[derive(Debug, Default, Clone)] +pub enum PickerType { + Tier, + Intra, + ToBase, + #[default] + BottomLevel, +} + +#[derive(Default, Debug)] +pub struct PickerInfo { + score: u64, + select_level: usize, + target_level: usize, + picker_type: PickerType, +} + pub trait LevelSelector: Sync + Send { fn pick_compaction( &mut self, @@ -71,7 +88,7 @@ pub struct SelectContext { // size of the files in `base_level` reaches its capacity, we will place data in a higher // level, which equals to `base_level -= 1;`. pub base_level: usize, - pub score_levels: Vec<(u64, usize, usize)>, + pub score_levels: Vec, } pub struct DynamicLevelSelectorCore { @@ -92,33 +109,34 @@ impl DynamicLevelSelectorCore { fn create_compaction_picker( &self, - select_level: usize, - target_level: usize, + picker_info: &PickerInfo, overlap_strategy: Arc, compaction_task_validator: Arc, ) -> Box { - if select_level == 0 { - if target_level == 0 { - Box::new(TierCompactionPicker::new_with_validator( - self.config.clone(), - compaction_task_validator, - )) - } else { - Box::new(LevelCompactionPicker::new_with_validator( - target_level, - self.config.clone(), - compaction_task_validator, + match picker_info.picker_type { + PickerType::Tier => Box::new(TierCompactionPicker::new_with_validator( + self.config.clone(), + compaction_task_validator, + )), + PickerType::ToBase => Box::new(LevelCompactionPicker::new_with_validator( + picker_info.target_level, + self.config.clone(), + compaction_task_validator, + )), + PickerType::Intra => Box::new(IntraCompactionPicker::new_with_validator( + self.config.clone(), + compaction_task_validator, + )), + PickerType::BottomLevel => { + assert_eq!(picker_info.select_level + 1, picker_info.target_level); + Box::new(MinOverlappingPicker::new( + picker_info.select_level, + picker_info.target_level, + self.config.max_bytes_for_level_base, + self.config.split_by_state_table, + overlap_strategy, )) } - } else { - assert_eq!(select_level + 1, target_level); - Box::new(MinOverlappingPicker::new( - select_level, - target_level, - self.config.max_bytes_for_level_base, - self.config.split_by_state_table, - overlap_strategy, - )) } } @@ -217,8 +235,12 @@ impl DynamicLevelSelectorCore { std::cmp::min(idle_file_count, overlapping_file_count) as u64 * SCORE_BASE / self.config.level0_tier_compact_file_number; // Reduce the level num of l0 overlapping sub_level - ctx.score_levels - .push((std::cmp::max(l0_overlapping_score, SCORE_BASE + 1), 0, 0)); + ctx.score_levels.push(PickerInfo { + score: std::cmp::max(l0_overlapping_score, SCORE_BASE + 1), + select_level: 0, + target_level: 0, + picker_type: PickerType::Tier, + }) } // The read query at the non-overlapping level only selects ssts that match the query @@ -254,8 +276,24 @@ impl DynamicLevelSelectorCore { }; // Reduce the level num of l0 non-overlapping sub_level - ctx.score_levels - .push((non_overlapping_score, 0, ctx.base_level)); + ctx.score_levels.push({ + PickerInfo { + score: non_overlapping_score, + select_level: 0, + target_level: ctx.base_level, + picker_type: PickerType::ToBase, + } + }); + + // FIXME: more accurate score calculation algorithm will be introduced (#11903) + ctx.score_levels.push({ + PickerInfo { + score: non_overlapping_score, + select_level: 0, + target_level: 0, + picker_type: PickerType::Intra, + } + }); } // The bottommost level can not be input level. @@ -275,16 +313,23 @@ impl DynamicLevelSelectorCore { if total_size == 0 { continue; } - ctx.score_levels.push(( - total_size * SCORE_BASE / ctx.level_max_bytes[level_idx], - level_idx, - level_idx + 1, - )); + + ctx.score_levels.push({ + PickerInfo { + score: total_size * SCORE_BASE / ctx.level_max_bytes[level_idx], + select_level: level_idx, + target_level: level_idx + 1, + picker_type: PickerType::BottomLevel, + } + }); } // sort reverse to pick the largest one. - ctx.score_levels - .sort_by(|a, b| b.0.cmp(&a.0).then_with(|| a.2.cmp(&b.2))); + ctx.score_levels.sort_by(|a, b| { + b.score + .cmp(&a.score) + .then_with(|| a.target_level.cmp(&b.target_level)) + }); ctx } @@ -379,21 +424,20 @@ impl LevelSelector for DynamicLevelSelector { let overlap_strategy = create_overlap_strategy(compaction_group.compaction_config.compaction_mode()); let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers); - // TODO: Determine which rule to enable by write limit let compaction_task_validator = Arc::new(CompactionTaskValidator::new( compaction_group.compaction_config.clone(), )); - for (score, select_level, target_level) in ctx.score_levels { - if score <= SCORE_BASE { + for picker_info in &ctx.score_levels { + if picker_info.score <= SCORE_BASE { return None; } let mut picker = dynamic_level_core.create_compaction_picker( - select_level, - target_level, + picker_info, overlap_strategy.clone(), compaction_task_validator.clone(), ); + let mut stats = LocalPickerStatistic::default(); if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) { ret.add_pending_task(task_id, level_handlers); @@ -404,9 +448,11 @@ impl LevelSelector for DynamicLevelSelector { self.task_type(), )); } - selector_stats - .skip_picker - .push((select_level, target_level, stats)); + selector_stats.skip_picker.push(( + picker_info.select_level, + picker_info.target_level, + stats, + )); } None } diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index c8025a9e99ac9..c224fbfe6ce55 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -79,11 +79,7 @@ impl CompactionPicker for LevelCompactionPicker { return Some(ret); } - if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { - return Some(ret); - } - - self.pick_l0_trivial_move_file(l0, level_handlers, stats) + None } } @@ -252,172 +248,6 @@ impl LevelCompactionPicker { } None } - - fn pick_l0_intra( - &self, - l0: &OverlappingLevel, - level_handler: &LevelHandler, - stats: &mut LocalPickerStatistic, - ) -> Option { - let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); - - for (idx, level) in l0.sub_levels.iter().enumerate() { - if level.level_type() != LevelType::Nonoverlapping - || level.total_file_size > self.config.sub_level_max_compaction_bytes - { - continue; - } - - if level_handler.is_level_all_pending_compact(level) { - continue; - } - - let max_compaction_bytes = std::cmp::min( - self.config.max_compaction_bytes, - self.config.sub_level_max_compaction_bytes, - ); - - let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( - self.config.sub_level_max_compaction_bytes / 2, - max_compaction_bytes, - self.config.level0_sub_level_compact_level_count as usize, - self.config.level0_max_compact_file_number, - overlap_strategy.clone(), - ); - - let l0_select_tables_vec = non_overlap_sub_level_picker - .pick_l0_multi_non_overlap_level(&l0.sub_levels[idx..], level_handler); - - if l0_select_tables_vec.is_empty() { - continue; - } - - let validator = CompactionTaskValidator::new(self.config.clone()); - let mut select_input_size = 0; - let mut total_file_count = 0; - for input in l0_select_tables_vec { - let mut max_level_size = 0; - for level_select_table in &input.sstable_infos { - let level_select_size = level_select_table - .iter() - .map(|sst| sst.file_size) - .sum::(); - - max_level_size = std::cmp::max(max_level_size, level_select_size); - } - - let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len()); - for level_select_sst in input.sstable_infos { - if level_select_sst.is_empty() { - continue; - } - select_level_inputs.push(InputLevel { - level_idx: 0, - level_type: LevelType::Nonoverlapping as i32, - table_infos: level_select_sst, - }); - - select_input_size += input.total_file_size; - total_file_count += input.total_file_count; - } - select_level_inputs.reverse(); - - let result = CompactionInput { - input_levels: select_level_inputs, - target_sub_level_id: level.sub_level_id, - select_input_size, - total_file_count: total_file_count as u64, - ..Default::default() - }; - - if !validator.valid_compact_task(&result, ValidationRuleType::Intra, stats) { - continue; - } - - return Some(result); - } - } - - None - } - - fn pick_l0_trivial_move_file( - &self, - l0: &OverlappingLevel, - level_handlers: &[LevelHandler], - stats: &mut LocalPickerStatistic, - ) -> Option { - let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); - - for (idx, level) in l0.sub_levels.iter().enumerate() { - if level.level_type == LevelType::Overlapping as i32 || idx + 1 >= l0.sub_levels.len() { - continue; - } - - if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping as i32 { - continue; - } - - let trivial_move_picker = TrivialMovePicker::new(0, 0, overlap_strategy.clone()); - - let select_sst = trivial_move_picker.pick_trivial_move_sst( - &l0.sub_levels[idx + 1].table_infos, - &level.table_infos, - level_handlers, - stats, - ); - - // only pick tables for trivial move - if select_sst.is_none() { - continue; - } - - let select_sst = select_sst.unwrap(); - - // support trivial move cross multi sub_levels - let mut overlap = overlap_strategy.create_overlap_info(); - overlap.update(&select_sst); - - assert!(overlap - .check_multiple_overlap(&l0.sub_levels[idx].table_infos) - .is_empty()); - let mut target_level_idx = idx; - while target_level_idx > 0 { - if l0.sub_levels[target_level_idx - 1].level_type - != LevelType::Nonoverlapping as i32 - || !overlap - .check_multiple_overlap(&l0.sub_levels[target_level_idx - 1].table_infos) - .is_empty() - { - break; - } - target_level_idx -= 1; - } - - let select_input_size = select_sst.file_size; - let input_levels = vec![ - InputLevel { - level_idx: 0, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![select_sst], - }, - InputLevel { - level_idx: 0, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![], - }, - ]; - return Some(CompactionInput { - input_levels, - target_level: 0, - target_sub_level_id: l0.sub_levels[target_level_idx].sub_level_id, - select_input_size, - total_file_count: 1, - ..Default::default() - }); - } - None - } } #[cfg(test)] @@ -655,42 +485,6 @@ pub mod tests { assert!(ret.is_none()); } - #[test] - fn test_compacting_key_range_overlap_intra_l0() { - // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's - // compacting_key_range. - let mut picker = create_compaction_picker_for_test(); - - let mut levels = Levels { - levels: vec![Level { - level_idx: 1, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![generate_table(3, 1, 200, 300, 2)], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, - }], - l0: Some(generate_l0_nonoverlapping_sublevels(vec![ - generate_table(1, 1, 100, 210, 2), - generate_table(2, 1, 200, 250, 2), - ])), - member_table_ids: vec![1], - ..Default::default() - }; - let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - - let mut local_stats = LocalPickerStatistic::default(); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - ret.add_pending_task(0, &mut levels_handler); - - push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3)); - assert!(picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .is_none()); - } - #[test] fn test_skip_compact_write_amplification_limit() { let config: CompactionConfig = CompactionConfigBuilder::new() @@ -817,55 +611,6 @@ pub mod tests { ); } - #[test] - fn test_issue_11154() { - let mut local_stats = LocalPickerStatistic::default(); - let mut l0 = generate_l0_overlapping_sublevels(vec![ - vec![ - generate_table(4, 1, 1, 200, 1), - generate_table(5, 1, 400, 600, 1), - ], - vec![ - generate_table(6, 1, 1, 200, 1), - generate_table(7, 1, 400, 600, 1), - ], - vec![ - generate_table(8, 1, 1, 200, 1), - generate_table(9, 1, 400, 600, 1), - ], - vec![generate_table(10, 1, 1, 600, 1)], - ]); - // We can set level_type only because the input above is valid. - for s in &mut l0.sub_levels { - s.level_type = LevelType::Nonoverlapping as i32; - } - let levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - - // Pick with large max_compaction_bytes results all sub levels included in input. - let config = Arc::new( - CompactionConfigBuilder::new() - .max_compaction_bytes(800) - .sub_level_max_compaction_bytes(50000) - .max_bytes_for_level_base(500000) - .level0_sub_level_compact_level_count(1) - .build(), - ); - // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION. - // So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION. - let mut picker = LevelCompactionPicker::new(1, config); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - // avoid add sst_10 and cause a big task - assert_eq!(3, ret.input_levels.len()); - } - #[test] fn test_l0_to_l1_break_on_pending_sub_level() { let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ @@ -972,262 +717,4 @@ pub mod tests { assert_eq!(3, ret.input_levels.len()); assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); } - - #[test] - fn test_pick_l0_intra() { - { - let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ - vec![ - generate_table(6, 1, 50, 99, 1), - generate_table(1, 1, 100, 200, 1), - generate_table(2, 1, 250, 300, 1), - ], - vec![ - generate_table(3, 1, 10, 90, 1), - generate_table(6, 1, 100, 110, 1), - ], - vec![ - generate_table(4, 1, 50, 99, 1), - generate_table(5, 1, 100, 200, 1), - ], - ]); - let levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); - let config = Arc::new( - CompactionConfigBuilder::new() - .level0_sub_level_compact_level_count(1) - .level0_overlapping_sub_level_compact_level_count(4) - .build(), - ); - let mut picker = LevelCompactionPicker::new(1, config); - let mut local_stats = LocalPickerStatistic::default(); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - ret.add_pending_task(1, &mut levels_handler); - assert_eq!( - ret.input_levels - .iter() - .map(|i| i.table_infos.len()) - .sum::(), - 3 - ); - } - - { - // Suppose keyguard [100, 200] [300, 400] - // will pick sst [1, 3, 4] - let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ - vec![ - generate_table(1, 1, 100, 200, 1), - generate_table(2, 1, 300, 400, 1), - ], - vec![ - generate_table(3, 1, 100, 200, 1), - generate_table(6, 1, 300, 500, 1), - ], - vec![ - generate_table(4, 1, 100, 200, 1), - generate_table(5, 1, 300, 400, 1), - ], - ]); - let levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); - let config = Arc::new( - CompactionConfigBuilder::new() - .level0_sub_level_compact_level_count(1) - .build(), - ); - let mut picker = LevelCompactionPicker::new(1, config); - let mut local_stats = LocalPickerStatistic::default(); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - ret.add_pending_task(1, &mut levels_handler); - assert_eq!( - ret.input_levels - .iter() - .map(|i| i.table_infos.len()) - .sum::(), - 3 - ); - - assert_eq!(4, ret.input_levels[0].table_infos[0].get_sst_id()); - assert_eq!(3, ret.input_levels[1].table_infos[0].get_sst_id()); - assert_eq!(1, ret.input_levels[2].table_infos[0].get_sst_id()); - - // will pick sst [2, 6, 5] - let ret2 = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - - assert_eq!( - ret2.input_levels - .iter() - .map(|i| i.table_infos.len()) - .sum::(), - 3 - ); - - assert_eq!(5, ret2.input_levels[0].table_infos[0].get_sst_id()); - assert_eq!(6, ret2.input_levels[1].table_infos[0].get_sst_id()); - assert_eq!(2, ret2.input_levels[2].table_infos[0].get_sst_id()); - } - - { - let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ - vec![ - generate_table(1, 1, 100, 149, 1), - generate_table(6, 1, 150, 199, 1), - generate_table(7, 1, 200, 250, 1), - generate_table(2, 1, 300, 400, 1), - ], - vec![ - generate_table(3, 1, 100, 149, 1), - generate_table(8, 1, 150, 199, 1), - generate_table(9, 1, 200, 250, 1), - generate_table(10, 1, 300, 400, 1), - ], - vec![ - generate_table(4, 1, 100, 199, 1), - generate_table(11, 1, 200, 250, 1), - generate_table(5, 1, 300, 350, 1), - ], - ]); - let levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); - let config = Arc::new( - CompactionConfigBuilder::new() - .level0_sub_level_compact_level_count(1) - .build(), - ); - let mut picker = LevelCompactionPicker::new(1, config); - let mut local_stats = LocalPickerStatistic::default(); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - ret.add_pending_task(1, &mut levels_handler); - assert_eq!( - ret.input_levels - .iter() - .map(|i| i.table_infos.len()) - .sum::(), - 3 - ); - - assert_eq!(11, ret.input_levels[0].table_infos[0].get_sst_id()); - assert_eq!(9, ret.input_levels[1].table_infos[0].get_sst_id()); - assert_eq!(7, ret.input_levels[2].table_infos[0].get_sst_id()); - - let ret2 = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - - assert_eq!( - ret2.input_levels - .iter() - .map(|i| i.table_infos.len()) - .sum::(), - 3 - ); - - assert_eq!(5, ret2.input_levels[0].table_infos[0].get_sst_id()); - assert_eq!(10, ret2.input_levels[1].table_infos[0].get_sst_id()); - assert_eq!(2, ret2.input_levels[2].table_infos[0].get_sst_id()); - } - } - - fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool { - compaction_input.input_levels.len() == 2 - && !compaction_input.input_levels[0].table_infos.is_empty() - && compaction_input.input_levels[1].table_infos.is_empty() - } - - #[test] - fn test_trivial_move() { - let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - let config = Arc::new( - CompactionConfigBuilder::new() - .level0_tier_compact_file_number(2) - .target_file_size_base(30) - .level0_sub_level_compact_level_count(20) // reject intra - .build(), - ); - let mut picker = LevelCompactionPicker::new(1, config); - - // Cannot trivial move because there is only 1 sub-level. - let l0 = generate_l0_overlapping_sublevels(vec![vec![ - generate_table(1, 1, 100, 110, 1), - generate_table(2, 1, 150, 250, 1), - ]]); - let levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); - let mut local_stats = LocalPickerStatistic::default(); - let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); - - // Cannot trivial move because sub-levels are overlapping - let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![ - vec![ - generate_table(1, 1, 100, 110, 1), - generate_table(2, 1, 150, 250, 1), - ], - vec![generate_table(3, 1, 10, 90, 1)], - vec![generate_table(4, 1, 10, 90, 1)], - vec![generate_table(5, 1, 10, 90, 1)], - ]); - let mut levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - assert!(picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .is_none()); - - // Cannot trivial move because latter sub-level is overlapping - levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Nonoverlapping as i32; - levels.l0.as_mut().unwrap().sub_levels[1].level_type = LevelType::Overlapping as i32; - let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); - - // Cannot trivial move because former sub-level is overlapping - levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Overlapping as i32; - levels.l0.as_mut().unwrap().sub_levels[1].level_type = LevelType::Nonoverlapping as i32; - let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); - - // trivial move - levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Nonoverlapping as i32; - levels.l0.as_mut().unwrap().sub_levels[1].level_type = LevelType::Nonoverlapping as i32; - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - assert!(is_l0_trivial_move(&ret)); - assert_eq!(ret.input_levels[0].table_infos.len(), 1); - } } diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs new file mode 100644 index 0000000000000..541b93254172b --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -0,0 +1,675 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_pb::hummock::hummock_version::Levels; +use risingwave_pb::hummock::{CompactionConfig, InputLevel, LevelType, OverlappingLevel}; + +use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; +use super::{ + CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic, + ValidationRuleType, +}; +use crate::hummock::compaction::create_overlap_strategy; +use crate::hummock::compaction::picker::TrivialMovePicker; +use crate::hummock::level_handler::LevelHandler; + +pub struct IntraCompactionPicker { + config: Arc, + compaction_task_validator: Arc, +} + +impl CompactionPicker for IntraCompactionPicker { + fn pick_compaction( + &mut self, + levels: &Levels, + level_handlers: &[LevelHandler], + stats: &mut LocalPickerStatistic, + ) -> Option { + let l0 = levels.l0.as_ref().unwrap(); + if l0.sub_levels.is_empty() { + return None; + } + if l0.sub_levels[0].level_type != LevelType::Nonoverlapping as i32 + && l0.sub_levels[0].table_infos.len() > 1 + { + stats.skip_by_overlapping += 1; + return None; + } + + let is_l0_pending_compact = + level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]); + + if is_l0_pending_compact { + stats.skip_by_pending_files += 1; + return None; + } + + if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { + return Some(ret); + } + + self.pick_l0_trivial_move_file(l0, level_handlers, stats) + } +} + +impl IntraCompactionPicker { + #[cfg(test)] + pub fn new(config: Arc) -> IntraCompactionPicker { + IntraCompactionPicker { + compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())), + config, + } + } + + pub fn new_with_validator( + config: Arc, + compaction_task_validator: Arc, + ) -> IntraCompactionPicker { + IntraCompactionPicker { + config, + compaction_task_validator, + } + } + + fn pick_l0_intra( + &self, + l0: &OverlappingLevel, + level_handler: &LevelHandler, + stats: &mut LocalPickerStatistic, + ) -> Option { + let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); + + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.level_type() != LevelType::Nonoverlapping + || level.total_file_size > self.config.sub_level_max_compaction_bytes + { + continue; + } + + if level_handler.is_level_all_pending_compact(level) { + continue; + } + + let max_compaction_bytes = std::cmp::min( + self.config.max_compaction_bytes, + self.config.sub_level_max_compaction_bytes, + ); + + let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( + self.config.sub_level_max_compaction_bytes / 2, + max_compaction_bytes, + self.config.level0_sub_level_compact_level_count as usize, + self.config.level0_max_compact_file_number, + overlap_strategy.clone(), + ); + + let l0_select_tables_vec = non_overlap_sub_level_picker + .pick_l0_multi_non_overlap_level(&l0.sub_levels[idx..], level_handler); + + if l0_select_tables_vec.is_empty() { + continue; + } + + let mut select_input_size = 0; + let mut total_file_count = 0; + for input in l0_select_tables_vec { + let mut max_level_size = 0; + for level_select_table in &input.sstable_infos { + let level_select_size = level_select_table + .iter() + .map(|sst| sst.file_size) + .sum::(); + + max_level_size = std::cmp::max(max_level_size, level_select_size); + } + + let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len()); + for level_select_sst in input.sstable_infos { + if level_select_sst.is_empty() { + continue; + } + select_level_inputs.push(InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping as i32, + table_infos: level_select_sst, + }); + + select_input_size += input.total_file_size; + total_file_count += input.total_file_count; + } + select_level_inputs.reverse(); + + let result = CompactionInput { + input_levels: select_level_inputs, + target_sub_level_id: level.sub_level_id, + select_input_size, + total_file_count: total_file_count as u64, + ..Default::default() + }; + + if !self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::Intra, + stats, + ) { + continue; + } + + return Some(result); + } + } + + None + } + + fn pick_l0_trivial_move_file( + &self, + l0: &OverlappingLevel, + level_handlers: &[LevelHandler], + stats: &mut LocalPickerStatistic, + ) -> Option { + let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); + + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.level_type == LevelType::Overlapping as i32 || idx + 1 >= l0.sub_levels.len() { + continue; + } + + if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping as i32 { + continue; + } + + let trivial_move_picker = TrivialMovePicker::new(0, 0, overlap_strategy.clone()); + + let select_sst = trivial_move_picker.pick_trivial_move_sst( + &l0.sub_levels[idx + 1].table_infos, + &level.table_infos, + level_handlers, + stats, + ); + + // only pick tables for trivial move + if select_sst.is_none() { + continue; + } + + let select_sst = select_sst.unwrap(); + + // support trivial move cross multi sub_levels + let mut overlap = overlap_strategy.create_overlap_info(); + overlap.update(&select_sst); + + assert!(overlap + .check_multiple_overlap(&l0.sub_levels[idx].table_infos) + .is_empty()); + let mut target_level_idx = idx; + while target_level_idx > 0 { + if l0.sub_levels[target_level_idx - 1].level_type + != LevelType::Nonoverlapping as i32 + || !overlap + .check_multiple_overlap(&l0.sub_levels[target_level_idx - 1].table_infos) + .is_empty() + { + break; + } + target_level_idx -= 1; + } + + let select_input_size = select_sst.file_size; + let input_levels = vec![ + InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping as i32, + table_infos: vec![select_sst], + }, + InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping as i32, + table_infos: vec![], + }, + ]; + return Some(CompactionInput { + input_levels, + target_level: 0, + target_sub_level_id: l0.sub_levels[target_level_idx].sub_level_id, + select_input_size, + total_file_count: 1, + ..Default::default() + }); + } + None + } +} + +#[cfg(test)] +pub mod tests { + use risingwave_pb::hummock::Level; + + use super::*; + use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; + use crate::hummock::compaction::level_selector::tests::{ + generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels, + generate_l0_overlapping_sublevels, generate_level, generate_table, + push_table_level0_overlapping, push_tables_level0_nonoverlapping, + }; + use crate::hummock::compaction::TierCompactionPicker; + + fn create_compaction_picker_for_test() -> IntraCompactionPicker { + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_tier_compact_file_number(2) + .level0_sub_level_compact_level_count(1) + .build(), + ); + IntraCompactionPicker::new(config) + } + + #[test] + fn test_l0_to_l1_compact_conflict() { + // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's + // compacting_key_range. + let mut picker = create_compaction_picker_for_test(); + let levels = vec![Level { + level_idx: 1, + level_type: LevelType::Nonoverlapping as i32, + table_infos: vec![], + total_file_size: 0, + sub_level_id: 0, + uncompressed_file_size: 0, + }]; + let mut levels = Levels { + levels, + l0: Some(OverlappingLevel { + sub_levels: vec![], + total_file_size: 0, + uncompressed_file_size: 0, + }), + member_table_ids: vec![1], + ..Default::default() + }; + push_tables_level0_nonoverlapping( + &mut levels, + vec![ + generate_table(1, 1, 100, 300, 2), + generate_table(2, 1, 350, 500, 2), + ], + ); + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + + let mut local_stats = LocalPickerStatistic::default(); + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + // trivial_move + ret.add_pending_task(0, &mut levels_handler); // pending only for test + push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]); + let config: CompactionConfig = CompactionConfigBuilder::new() + .level0_tier_compact_file_number(2) + .max_compaction_bytes(1000) + .sub_level_max_compaction_bytes(150) + .max_bytes_for_level_multiplier(1) + .level0_sub_level_compact_level_count(3) + .build(); + let mut picker = TierCompactionPicker::new(Arc::new(config)); + + let ret: Option = + picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); + } + + #[test] + fn test_compacting_key_range_overlap_intra_l0() { + // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's + // compacting_key_range. + let mut picker = create_compaction_picker_for_test(); + + let mut levels = Levels { + levels: vec![Level { + level_idx: 1, + level_type: LevelType::Nonoverlapping as i32, + table_infos: vec![generate_table(3, 1, 200, 300, 2)], + total_file_size: 0, + sub_level_id: 0, + uncompressed_file_size: 0, + }], + l0: Some(generate_l0_nonoverlapping_sublevels(vec![ + generate_table(1, 1, 100, 210, 2), + generate_table(2, 1, 200, 250, 2), + ])), + member_table_ids: vec![1], + ..Default::default() + }; + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + + let mut local_stats = LocalPickerStatistic::default(); + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + ret.add_pending_task(0, &mut levels_handler); + + push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3)); + assert!(picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .is_none()); + } + + #[test] + fn test_pick_l0_intra() { + { + let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ + vec![ + generate_table(6, 1, 50, 99, 1), + generate_table(1, 1, 100, 200, 1), + generate_table(2, 1, 250, 300, 1), + ], + vec![ + generate_table(3, 1, 10, 90, 1), + generate_table(6, 1, 100, 110, 1), + ], + vec![ + generate_table(4, 1, 50, 99, 1), + generate_table(5, 1, 100, 200, 1), + ], + ]); + let levels = Levels { + l0: Some(l0), + levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], + member_table_ids: vec![1], + ..Default::default() + }; + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_sub_level_compact_level_count(1) + .level0_overlapping_sub_level_compact_level_count(4) + .build(), + ); + let mut picker = IntraCompactionPicker::new(config); + let mut local_stats = LocalPickerStatistic::default(); + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + ret.add_pending_task(1, &mut levels_handler); + assert_eq!( + ret.input_levels + .iter() + .map(|i| i.table_infos.len()) + .sum::(), + 3 + ); + } + + { + // Suppose keyguard [100, 200] [300, 400] + // will pick sst [1, 3, 4] + let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ + vec![ + generate_table(1, 1, 100, 200, 1), + generate_table(2, 1, 300, 400, 1), + ], + vec![ + generate_table(3, 1, 100, 200, 1), + generate_table(6, 1, 300, 500, 1), + ], + vec![ + generate_table(4, 1, 100, 200, 1), + generate_table(5, 1, 300, 400, 1), + ], + ]); + let levels = Levels { + l0: Some(l0), + levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], + member_table_ids: vec![1], + ..Default::default() + }; + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_sub_level_compact_level_count(1) + .build(), + ); + let mut picker = IntraCompactionPicker::new(config); + let mut local_stats = LocalPickerStatistic::default(); + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + ret.add_pending_task(1, &mut levels_handler); + assert_eq!( + ret.input_levels + .iter() + .map(|i| i.table_infos.len()) + .sum::(), + 3 + ); + + assert_eq!(4, ret.input_levels[0].table_infos[0].get_sst_id()); + assert_eq!(3, ret.input_levels[1].table_infos[0].get_sst_id()); + assert_eq!(1, ret.input_levels[2].table_infos[0].get_sst_id()); + + // will pick sst [2, 6, 5] + let ret2 = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + + assert_eq!( + ret2.input_levels + .iter() + .map(|i| i.table_infos.len()) + .sum::(), + 3 + ); + + assert_eq!(5, ret2.input_levels[0].table_infos[0].get_sst_id()); + assert_eq!(6, ret2.input_levels[1].table_infos[0].get_sst_id()); + assert_eq!(2, ret2.input_levels[2].table_infos[0].get_sst_id()); + } + + { + let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ + vec![ + generate_table(1, 1, 100, 149, 1), + generate_table(6, 1, 150, 199, 1), + generate_table(7, 1, 200, 250, 1), + generate_table(2, 1, 300, 400, 1), + ], + vec![ + generate_table(3, 1, 100, 149, 1), + generate_table(8, 1, 150, 199, 1), + generate_table(9, 1, 200, 250, 1), + generate_table(10, 1, 300, 400, 1), + ], + vec![ + generate_table(4, 1, 100, 199, 1), + generate_table(11, 1, 200, 250, 1), + generate_table(5, 1, 300, 350, 1), + ], + ]); + let levels = Levels { + l0: Some(l0), + levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], + member_table_ids: vec![1], + ..Default::default() + }; + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_sub_level_compact_level_count(1) + .build(), + ); + let mut picker = IntraCompactionPicker::new(config); + let mut local_stats = LocalPickerStatistic::default(); + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + ret.add_pending_task(1, &mut levels_handler); + assert_eq!( + ret.input_levels + .iter() + .map(|i| i.table_infos.len()) + .sum::(), + 3 + ); + + assert_eq!(11, ret.input_levels[0].table_infos[0].get_sst_id()); + assert_eq!(9, ret.input_levels[1].table_infos[0].get_sst_id()); + assert_eq!(7, ret.input_levels[2].table_infos[0].get_sst_id()); + + let ret2 = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + + assert_eq!( + ret2.input_levels + .iter() + .map(|i| i.table_infos.len()) + .sum::(), + 3 + ); + + assert_eq!(5, ret2.input_levels[0].table_infos[0].get_sst_id()); + assert_eq!(10, ret2.input_levels[1].table_infos[0].get_sst_id()); + assert_eq!(2, ret2.input_levels[2].table_infos[0].get_sst_id()); + } + } + + fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool { + compaction_input.input_levels.len() == 2 + && !compaction_input.input_levels[0].table_infos.is_empty() + && compaction_input.input_levels[1].table_infos.is_empty() + } + + #[test] + fn test_trivial_move() { + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_tier_compact_file_number(2) + .target_file_size_base(30) + .level0_sub_level_compact_level_count(20) // reject intra + .build(), + ); + let mut picker = IntraCompactionPicker::new(config); + + // Cannot trivial move because there is only 1 sub-level. + let l0 = generate_l0_overlapping_sublevels(vec![vec![ + generate_table(1, 1, 100, 110, 1), + generate_table(2, 1, 150, 250, 1), + ]]); + let levels = Levels { + l0: Some(l0), + levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], + member_table_ids: vec![1], + ..Default::default() + }; + levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); + let mut local_stats = LocalPickerStatistic::default(); + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); + + // Cannot trivial move because sub-levels are overlapping + let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![ + vec![ + generate_table(1, 1, 100, 110, 1), + generate_table(2, 1, 150, 250, 1), + ], + vec![generate_table(3, 1, 10, 90, 1)], + vec![generate_table(4, 1, 10, 90, 1)], + vec![generate_table(5, 1, 10, 90, 1)], + ]); + let mut levels = Levels { + l0: Some(l0), + levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], + member_table_ids: vec![1], + ..Default::default() + }; + assert!(picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .is_none()); + + // Cannot trivial move because latter sub-level is overlapping + levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Nonoverlapping as i32; + levels.l0.as_mut().unwrap().sub_levels[1].level_type = LevelType::Overlapping as i32; + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); + + // Cannot trivial move because former sub-level is overlapping + levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Overlapping as i32; + levels.l0.as_mut().unwrap().sub_levels[1].level_type = LevelType::Nonoverlapping as i32; + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); + + // trivial move + levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Nonoverlapping as i32; + levels.l0.as_mut().unwrap().sub_levels[1].level_type = LevelType::Nonoverlapping as i32; + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + assert!(is_l0_trivial_move(&ret)); + assert_eq!(ret.input_levels[0].table_infos.len(), 1); + } + + #[test] + fn test_issue_11154() { + let mut local_stats = LocalPickerStatistic::default(); + let mut l0 = generate_l0_overlapping_sublevels(vec![ + vec![ + generate_table(4, 1, 1, 200, 1), + generate_table(5, 1, 400, 600, 1), + ], + vec![ + generate_table(6, 1, 1, 200, 1), + generate_table(7, 1, 400, 600, 1), + ], + vec![ + generate_table(8, 1, 1, 200, 1), + generate_table(9, 1, 400, 600, 1), + ], + vec![generate_table(10, 1, 1, 600, 1)], + ]); + // We can set level_type only because the input above is valid. + for s in &mut l0.sub_levels { + s.level_type = LevelType::Nonoverlapping as i32; + } + let levels = Levels { + l0: Some(l0), + levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])], + member_table_ids: vec![1], + ..Default::default() + }; + let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + + // Pick with large max_compaction_bytes results all sub levels included in input. + let config = Arc::new( + CompactionConfigBuilder::new() + .max_compaction_bytes(800) + .sub_level_max_compaction_bytes(50000) + .max_bytes_for_level_base(500000) + .level0_sub_level_compact_level_count(1) + .build(), + ); + // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION. + // So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION. + let mut picker = IntraCompactionPicker::new(config); + let ret = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); + // avoid add sst_10 and cause a big task + assert_eq!(3, ret.input_levels.len()); + } +} diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 04e0550b8413c..15e7a61f548ee 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod base_level_compaction_picker; +mod intra_compaction_picker; mod manual_compaction_picker; mod min_overlap_compaction_picker; mod space_reclaim_compaction_picker; @@ -25,6 +26,7 @@ mod compaction_task_validator; pub use base_level_compaction_picker::LevelCompactionPicker; pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType}; +pub use intra_compaction_picker::IntraCompactionPicker; pub use manual_compaction_picker::ManualCompactionPicker; pub use min_overlap_compaction_picker::MinOverlappingPicker; use risingwave_pb::hummock::hummock_version::Levels; diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index cb5edc28a32f3..ca5418cbe6a0f 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -500,17 +500,6 @@ pub(crate) mod tests { .unwrap() .to_vec(); assert_eq!(get_val, val); - - // 6. get compact task and there should be none - let compact_task = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_level_selector(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(6, compact_task.target_level); } pub(crate) async fn flush_and_commit(