@@ -20,14 +20,18 @@ use risingwave_pb::hummock::hummock_version::Levels;
20
20
use risingwave_pb:: hummock:: { CompactionConfig , InputLevel , Level , LevelType , OverlappingLevel } ;
21
21
22
22
use super :: min_overlap_compaction_picker:: NonOverlapSubLevelPicker ;
23
- use super :: { CompactionInput , CompactionPicker , LocalPickerStatistic } ;
23
+ use super :: {
24
+ CompactionInput , CompactionPicker , CompactionTaskValidator , LocalPickerStatistic ,
25
+ ValidationRuleType ,
26
+ } ;
24
27
use crate :: hummock:: compaction:: create_overlap_strategy;
25
28
use crate :: hummock:: compaction:: picker:: TrivialMovePicker ;
26
29
use crate :: hummock:: level_handler:: LevelHandler ;
27
30
28
31
pub struct LevelCompactionPicker {
29
32
target_level : usize ,
30
33
config : Arc < CompactionConfig > ,
34
+ compaction_task_validator : Arc < CompactionTaskValidator > ,
31
35
}
32
36
33
37
impl CompactionPicker for LevelCompactionPicker {
@@ -84,13 +88,27 @@ impl CompactionPicker for LevelCompactionPicker {
84
88
}
85
89
86
90
impl LevelCompactionPicker {
91
+ #[ cfg( test) ]
87
92
pub fn new ( target_level : usize , config : Arc < CompactionConfig > ) -> LevelCompactionPicker {
88
93
LevelCompactionPicker {
89
94
target_level,
95
+ compaction_task_validator : Arc :: new ( CompactionTaskValidator :: new ( config. clone ( ) ) ) ,
90
96
config,
91
97
}
92
98
}
93
99
100
+ pub fn new_with_validator (
101
+ target_level : usize ,
102
+ config : Arc < CompactionConfig > ,
103
+ compaction_task_validator : Arc < CompactionTaskValidator > ,
104
+ ) -> LevelCompactionPicker {
105
+ LevelCompactionPicker {
106
+ target_level,
107
+ config,
108
+ compaction_task_validator,
109
+ }
110
+ }
111
+
94
112
fn pick_base_trivial_move (
95
113
& self ,
96
114
l0 : & OverlappingLevel ,
@@ -117,10 +135,10 @@ impl LevelCompactionPicker {
117
135
level_handlers : & [ LevelHandler ] ,
118
136
stats : & mut LocalPickerStatistic ,
119
137
) -> Option < CompactionInput > {
138
+ // TODO: remove this
120
139
let l0_size = l0. total_file_size - level_handlers[ 0 ] . get_pending_file_size ( ) ;
121
140
let base_level_size = target_level. total_file_size
122
141
- level_handlers[ target_level. level_idx as usize ] . get_pending_file_size ( ) ;
123
-
124
142
if l0_size < base_level_size {
125
143
stats. skip_by_write_amp_limit += 1 ;
126
144
return None ;
@@ -157,7 +175,7 @@ impl LevelCompactionPicker {
157
175
158
176
let mut skip_by_pending = false ;
159
177
let mut input_levels = vec ! [ ] ;
160
- let mut min_write_amp_meet = false ;
178
+
161
179
for input in l0_select_tables_vec {
162
180
let l0_select_tables = input
163
181
. sstable_infos
@@ -184,16 +202,6 @@ impl LevelCompactionPicker {
184
202
continue ;
185
203
}
186
204
187
- // The size of target level may be too large, we shall skip this compact task and wait
188
- // the data in base level compact to lower level.
189
- if target_level_size > self . config . max_compaction_bytes && strict_check {
190
- continue ;
191
- }
192
-
193
- if input. total_file_size >= target_level_size {
194
- min_write_amp_meet = true ;
195
- }
196
-
197
205
input_levels. push ( ( input, target_level_size, target_level_ssts) ) ;
198
206
}
199
207
@@ -204,20 +212,7 @@ impl LevelCompactionPicker {
204
212
return None ;
205
213
}
206
214
207
- if !min_write_amp_meet && strict_check {
208
- // If the write-amplification of all candidate task are large, we may hope to wait base
209
- // level compact more data to lower level. But if we skip all task, I'm
210
- // afraid the data will be blocked in level0 and will be never compacted to base level.
211
- // So we only allow one task exceed write-amplification-limit running in
212
- // level0 to base-level.
213
- return None ;
214
- }
215
-
216
215
for ( input, target_file_size, target_level_files) in input_levels {
217
- if min_write_amp_meet && input. total_file_size < target_file_size {
218
- continue ;
219
- }
220
-
221
216
let mut select_level_inputs = input
222
217
. sstable_infos
223
218
. into_iter ( )
@@ -228,18 +223,33 @@ impl LevelCompactionPicker {
228
223
} )
229
224
. collect_vec ( ) ;
230
225
select_level_inputs. reverse ( ) ;
226
+ let target_file_count = target_level_files. len ( ) ;
231
227
select_level_inputs. push ( InputLevel {
232
228
level_idx : target_level. level_idx ,
233
229
level_type : target_level. level_type ,
234
230
table_infos : target_level_files,
235
231
} ) ;
236
- return Some ( CompactionInput {
232
+
233
+ let result = CompactionInput {
237
234
input_levels : select_level_inputs,
238
235
target_level : self . target_level ,
239
- target_sub_level_id : 0 ,
240
- } ) ;
236
+ select_input_size : input. total_file_size ,
237
+ target_input_size : target_file_size,
238
+ total_file_count : ( input. total_file_count + target_file_count) as u64 ,
239
+ ..Default :: default ( )
240
+ } ;
241
+
242
+ if !self . compaction_task_validator . valid_compact_task (
243
+ & result,
244
+ ValidationRuleType :: ToBase ,
245
+ stats,
246
+ ) && strict_check
247
+ {
248
+ continue ;
249
+ }
250
+
251
+ return Some ( result) ;
241
252
}
242
- stats. skip_by_write_amp_limit += 1 ;
243
253
None
244
254
}
245
255
@@ -267,8 +277,6 @@ impl LevelCompactionPicker {
267
277
self . config . sub_level_max_compaction_bytes ,
268
278
) ;
269
279
270
- let tier_sub_level_compact_level_count =
271
- self . config . level0_sub_level_compact_level_count as usize ;
272
280
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker :: new (
273
281
self . config . sub_level_max_compaction_bytes / 2 ,
274
282
max_compaction_bytes,
@@ -284,18 +292,10 @@ impl LevelCompactionPicker {
284
292
continue ;
285
293
}
286
294
287
- let mut skip_by_write_amp = false ;
288
- // Limit the number of selection levels for the non-overlapping
289
- // sub_level at least level0_sub_level_compact_level_count
290
- for ( plan_index, input) in l0_select_tables_vec. into_iter ( ) . enumerate ( ) {
291
- if plan_index == 0
292
- && input. sstable_infos . len ( )
293
- < self . config . level0_sub_level_compact_level_count as usize
294
- {
295
- // first plan level count smaller than limit
296
- break ;
297
- }
298
-
295
+ let validator = CompactionTaskValidator :: new ( self . config . clone ( ) ) ;
296
+ let mut select_input_size = 0 ;
297
+ let mut total_file_count = 0 ;
298
+ for input in l0_select_tables_vec {
299
299
let mut max_level_size = 0 ;
300
300
for level_select_table in & input. sstable_infos {
301
301
let level_select_size = level_select_table
@@ -306,22 +306,6 @@ impl LevelCompactionPicker {
306
306
max_level_size = std:: cmp:: max ( max_level_size, level_select_size) ;
307
307
}
308
308
309
- // This limitation would keep our write-amplification no more than
310
- // ln(max_compaction_bytes/flush_level_bytes) /
311
- // ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half
312
- // of level0_sub_level_compact_level_count just for convenient.
313
- let is_write_amp_large =
314
- max_level_size * self . config . level0_sub_level_compact_level_count as u64 / 2
315
- >= input. total_file_size ;
316
-
317
- if ( is_write_amp_large
318
- || input. sstable_infos . len ( ) < tier_sub_level_compact_level_count)
319
- && input. total_file_count < self . config . level0_max_compact_file_number as usize
320
- {
321
- skip_by_write_amp = true ;
322
- continue ;
323
- }
324
-
325
309
let mut select_level_inputs = Vec :: with_capacity ( input. sstable_infos . len ( ) ) ;
326
310
for level_select_sst in input. sstable_infos {
327
311
if level_select_sst. is_empty ( ) {
@@ -332,17 +316,25 @@ impl LevelCompactionPicker {
332
316
level_type : LevelType :: Nonoverlapping as i32 ,
333
317
table_infos : level_select_sst,
334
318
} ) ;
319
+
320
+ select_input_size += input. total_file_size ;
321
+ total_file_count += input. total_file_count ;
335
322
}
336
323
select_level_inputs. reverse ( ) ;
337
- return Some ( CompactionInput {
324
+
325
+ let result = CompactionInput {
338
326
input_levels : select_level_inputs,
339
- target_level : 0 ,
340
327
target_sub_level_id : level. sub_level_id ,
341
- } ) ;
342
- }
328
+ select_input_size,
329
+ total_file_count : total_file_count as u64 ,
330
+ ..Default :: default ( )
331
+ } ;
343
332
344
- if skip_by_write_amp {
345
- stats. skip_by_write_amp_limit += 1 ;
333
+ if !validator. valid_compact_task ( & result, ValidationRuleType :: Intra , stats) {
334
+ continue ;
335
+ }
336
+
337
+ return Some ( result) ;
346
338
}
347
339
}
348
340
@@ -402,6 +394,7 @@ impl LevelCompactionPicker {
402
394
target_level_idx -= 1 ;
403
395
}
404
396
397
+ let select_input_size = select_sst. file_size ;
405
398
let input_levels = vec ! [
406
399
InputLevel {
407
400
level_idx: 0 ,
@@ -418,6 +411,9 @@ impl LevelCompactionPicker {
418
411
input_levels,
419
412
target_level : 0 ,
420
413
target_sub_level_id : l0. sub_levels [ target_level_idx] . sub_level_id ,
414
+ select_input_size,
415
+ total_file_count : 1 ,
416
+ ..Default :: default ( )
421
417
} ) ;
422
418
}
423
419
None
@@ -901,6 +897,7 @@ pub mod tests {
901
897
} ] ,
902
898
target_level : 1 ,
903
899
target_sub_level_id : pending_level. sub_level_id ,
900
+ ..Default :: default ( )
904
901
} ;
905
902
assert ! ( !levels_handler[ 0 ] . is_level_pending_compact( & pending_level) ) ;
906
903
tier_task_input. add_pending_task ( 1 , & mut levels_handler) ;
@@ -918,7 +915,6 @@ pub mod tests {
918
915
// But stopped by pending sub-level when trying to include more sub-levels.
919
916
let mut picker = LevelCompactionPicker :: new ( 1 , config. clone ( ) ) ;
920
917
let ret = picker. pick_compaction ( & levels, & levels_handler, & mut local_stats) ;
921
-
922
918
assert ! ( ret. is_none( ) ) ;
923
919
924
920
// Free the pending sub-level.
@@ -964,7 +960,6 @@ pub mod tests {
964
960
let ret = picker
965
961
. pick_compaction ( & levels, & levels_handler, & mut local_stats)
966
962
. unwrap ( ) ;
967
- // println!("ret.input_levels: {:?}", ret.input_levels);
968
963
// 1. trivial_move
969
964
assert_eq ! ( 2 , ret. input_levels. len( ) ) ;
970
965
assert ! ( ret. input_levels[ 1 ] . table_infos. is_empty( ) ) ;
@@ -974,7 +969,6 @@ pub mod tests {
974
969
let ret = picker
975
970
. pick_compaction ( & levels, & levels_handler, & mut local_stats)
976
971
. unwrap ( ) ;
977
- println ! ( "ret.input_levels: {:?}" , ret. input_levels) ;
978
972
assert_eq ! ( 3 , ret. input_levels. len( ) ) ;
979
973
assert_eq ! ( 6 , ret. input_levels[ 0 ] . table_infos[ 0 ] . sst_id) ;
980
974
}
0 commit comments