@@ -20,14 +20,18 @@ use risingwave_pb::hummock::hummock_version::Levels;
20
20
use risingwave_pb:: hummock:: { CompactionConfig , InputLevel , Level , LevelType , OverlappingLevel } ;
21
21
22
22
use super :: min_overlap_compaction_picker:: NonOverlapSubLevelPicker ;
23
- use super :: { CompactionInput , CompactionPicker , LocalPickerStatistic } ;
23
+ use super :: {
24
+ CompactionInput , CompactionPicker , CompactionTaskValidator , LocalPickerStatistic ,
25
+ ValidationRuleType ,
26
+ } ;
24
27
use crate :: hummock:: compaction:: create_overlap_strategy;
25
28
use crate :: hummock:: compaction:: picker:: TrivialMovePicker ;
26
29
use crate :: hummock:: level_handler:: LevelHandler ;
27
30
28
31
pub struct LevelCompactionPicker {
29
32
target_level : usize ,
30
33
config : Arc < CompactionConfig > ,
34
+ compaction_task_validator : Arc < CompactionTaskValidator > ,
31
35
}
32
36
33
37
impl CompactionPicker for LevelCompactionPicker {
@@ -80,13 +84,27 @@ impl CompactionPicker for LevelCompactionPicker {
80
84
}
81
85
82
86
impl LevelCompactionPicker {
87
+ #[ cfg( test) ]
83
88
pub fn new ( target_level : usize , config : Arc < CompactionConfig > ) -> LevelCompactionPicker {
84
89
LevelCompactionPicker {
85
90
target_level,
91
+ compaction_task_validator : Arc :: new ( CompactionTaskValidator :: new ( config. clone ( ) ) ) ,
86
92
config,
87
93
}
88
94
}
89
95
96
+ pub fn new_with_validator (
97
+ target_level : usize ,
98
+ config : Arc < CompactionConfig > ,
99
+ compaction_task_validator : Arc < CompactionTaskValidator > ,
100
+ ) -> LevelCompactionPicker {
101
+ LevelCompactionPicker {
102
+ target_level,
103
+ config,
104
+ compaction_task_validator,
105
+ }
106
+ }
107
+
90
108
fn pick_base_trivial_move (
91
109
& self ,
92
110
l0 : & OverlappingLevel ,
@@ -113,10 +131,10 @@ impl LevelCompactionPicker {
113
131
level_handlers : & [ LevelHandler ] ,
114
132
stats : & mut LocalPickerStatistic ,
115
133
) -> Option < CompactionInput > {
134
+ // TODO: remove this
116
135
let l0_size = l0. total_file_size - level_handlers[ 0 ] . get_pending_file_size ( ) ;
117
136
let base_level_size = target_level. total_file_size
118
137
- level_handlers[ target_level. level_idx as usize ] . get_pending_file_size ( ) ;
119
-
120
138
if l0_size < base_level_size {
121
139
stats. skip_by_write_amp_limit += 1 ;
122
140
return None ;
@@ -153,7 +171,7 @@ impl LevelCompactionPicker {
153
171
154
172
let mut skip_by_pending = false ;
155
173
let mut input_levels = vec ! [ ] ;
156
- let mut min_write_amp_meet = false ;
174
+
157
175
for input in l0_select_tables_vec {
158
176
let l0_select_tables = input
159
177
. sstable_infos
@@ -180,16 +198,6 @@ impl LevelCompactionPicker {
180
198
continue ;
181
199
}
182
200
183
- // The size of target level may be too large, we shall skip this compact task and wait
184
- // the data in base level compact to lower level.
185
- if target_level_size > self . config . max_compaction_bytes && strict_check {
186
- continue ;
187
- }
188
-
189
- if input. total_file_size >= target_level_size {
190
- min_write_amp_meet = true ;
191
- }
192
-
193
201
input_levels. push ( ( input, target_level_size, target_level_ssts) ) ;
194
202
}
195
203
@@ -200,20 +208,7 @@ impl LevelCompactionPicker {
200
208
return None ;
201
209
}
202
210
203
- if !min_write_amp_meet && strict_check {
204
- // If the write-amplification of all candidate task are large, we may hope to wait base
205
- // level compact more data to lower level. But if we skip all task, I'm
206
- // afraid the data will be blocked in level0 and will be never compacted to base level.
207
- // So we only allow one task exceed write-amplification-limit running in
208
- // level0 to base-level.
209
- return None ;
210
- }
211
-
212
211
for ( input, target_file_size, target_level_files) in input_levels {
213
- if min_write_amp_meet && input. total_file_size < target_file_size {
214
- continue ;
215
- }
216
-
217
212
let mut select_level_inputs = input
218
213
. sstable_infos
219
214
. into_iter ( )
@@ -224,18 +219,33 @@ impl LevelCompactionPicker {
224
219
} )
225
220
. collect_vec ( ) ;
226
221
select_level_inputs. reverse ( ) ;
222
+ let target_file_count = target_level_files. len ( ) ;
227
223
select_level_inputs. push ( InputLevel {
228
224
level_idx : target_level. level_idx ,
229
225
level_type : target_level. level_type ,
230
226
table_infos : target_level_files,
231
227
} ) ;
232
- return Some ( CompactionInput {
228
+
229
+ let result = CompactionInput {
233
230
input_levels : select_level_inputs,
234
231
target_level : self . target_level ,
235
- target_sub_level_id : 0 ,
236
- } ) ;
232
+ select_input_size : input. total_file_size ,
233
+ target_input_size : target_file_size,
234
+ total_file_count : ( input. total_file_count + target_file_count) as u64 ,
235
+ ..Default :: default ( )
236
+ } ;
237
+
238
+ if !self . compaction_task_validator . valid_compact_task (
239
+ & result,
240
+ ValidationRuleType :: ToBase ,
241
+ stats,
242
+ ) && strict_check
243
+ {
244
+ continue ;
245
+ }
246
+
247
+ return Some ( result) ;
237
248
}
238
- stats. skip_by_write_amp_limit += 1 ;
239
249
None
240
250
}
241
251
}
@@ -632,6 +642,7 @@ pub mod tests {
632
642
} ] ,
633
643
target_level : 1 ,
634
644
target_sub_level_id : pending_level. sub_level_id ,
645
+ ..Default :: default ( )
635
646
} ;
636
647
assert ! ( !levels_handler[ 0 ] . is_level_pending_compact( & pending_level) ) ;
637
648
tier_task_input. add_pending_task ( 1 , & mut levels_handler) ;
@@ -649,7 +660,6 @@ pub mod tests {
649
660
// But stopped by pending sub-level when trying to include more sub-levels.
650
661
let mut picker = LevelCompactionPicker :: new ( 1 , config. clone ( ) ) ;
651
662
let ret = picker. pick_compaction ( & levels, & levels_handler, & mut local_stats) ;
652
-
653
663
assert ! ( ret. is_none( ) ) ;
654
664
655
665
// Free the pending sub-level.
@@ -695,7 +705,6 @@ pub mod tests {
695
705
let ret = picker
696
706
. pick_compaction ( & levels, & levels_handler, & mut local_stats)
697
707
. unwrap ( ) ;
698
- // println!("ret.input_levels: {:?}", ret.input_levels);
699
708
// 1. trivial_move
700
709
assert_eq ! ( 2 , ret. input_levels. len( ) ) ;
701
710
assert ! ( ret. input_levels[ 1 ] . table_infos. is_empty( ) ) ;
@@ -705,7 +714,6 @@ pub mod tests {
705
714
let ret = picker
706
715
. pick_compaction ( & levels, & levels_handler, & mut local_stats)
707
716
. unwrap ( ) ;
708
- println ! ( "ret.input_levels: {:?}" , ret. input_levels) ;
709
717
assert_eq ! ( 3 , ret. input_levels. len( ) ) ;
710
718
assert_eq ! ( 6 , ret. input_levels[ 0 ] . table_infos[ 0 ] . sst_id) ;
711
719
}
0 commit comments