@@ -26,8 +26,8 @@ use risingwave_pb::hummock::hummock_version::Levels;
26
26
use risingwave_pb:: hummock:: { compact_task, CompactionConfig , LevelType } ;
27
27
28
28
use super :: picker:: {
29
- CompactionTaskValidator , SpaceReclaimCompactionPicker , SpaceReclaimPickerState , TtlPickerState ,
30
- TtlReclaimCompactionPicker ,
29
+ CompactionTaskValidator , IntraCompactionPicker , SpaceReclaimCompactionPicker ,
30
+ SpaceReclaimPickerState , TtlPickerState , TtlReclaimCompactionPicker ,
31
31
} ;
32
32
use super :: {
33
33
create_compaction_task, LevelCompactionPicker , ManualCompactionOption , ManualCompactionPicker ,
@@ -44,6 +44,23 @@ use crate::rpc::metrics::MetaMetrics;
44
44
45
45
pub const SCORE_BASE : u64 = 100 ;
46
46
47
+ #[ derive( Debug , Default , Clone ) ]
48
+ pub enum PickerType {
49
+ Tier ,
50
+ Intra ,
51
+ ToBase ,
52
+ #[ default]
53
+ BottomLevel ,
54
+ }
55
+
56
+ #[ derive( Default , Debug ) ]
57
+ pub struct PickerInfo {
58
+ score : u64 ,
59
+ select_level : usize ,
60
+ target_level : usize ,
61
+ picker_type : PickerType ,
62
+ }
63
+
47
64
pub trait LevelSelector : Sync + Send {
48
65
fn pick_compaction (
49
66
& mut self ,
@@ -71,7 +88,7 @@ pub struct SelectContext {
71
88
// size of the files in `base_level` reaches its capacity, we will place data in a higher
72
89
// level, which equals to `base_level -= 1;`.
73
90
pub base_level : usize ,
74
- pub score_levels : Vec < ( u64 , usize , usize ) > ,
91
+ pub score_levels : Vec < PickerInfo > ,
75
92
}
76
93
77
94
pub struct DynamicLevelSelectorCore {
@@ -92,33 +109,34 @@ impl DynamicLevelSelectorCore {
92
109
93
110
fn create_compaction_picker (
94
111
& self ,
95
- select_level : usize ,
96
- target_level : usize ,
112
+ picker_info : & PickerInfo ,
97
113
overlap_strategy : Arc < dyn OverlapStrategy > ,
98
114
compaction_task_validator : Arc < CompactionTaskValidator > ,
99
115
) -> Box < dyn CompactionPicker > {
100
- if select_level == 0 {
101
- if target_level == 0 {
102
- Box :: new ( TierCompactionPicker :: new_with_validator (
103
- self . config . clone ( ) ,
104
- compaction_task_validator,
105
- ) )
106
- } else {
107
- Box :: new ( LevelCompactionPicker :: new_with_validator (
108
- target_level,
109
- self . config . clone ( ) ,
110
- compaction_task_validator,
116
+ match picker_info. picker_type {
117
+ PickerType :: Tier => Box :: new ( TierCompactionPicker :: new_with_validator (
118
+ self . config . clone ( ) ,
119
+ compaction_task_validator,
120
+ ) ) ,
121
+ PickerType :: ToBase => Box :: new ( LevelCompactionPicker :: new_with_validator (
122
+ picker_info. target_level ,
123
+ self . config . clone ( ) ,
124
+ compaction_task_validator,
125
+ ) ) ,
126
+ PickerType :: Intra => Box :: new ( IntraCompactionPicker :: new_with_validator (
127
+ self . config . clone ( ) ,
128
+ compaction_task_validator,
129
+ ) ) ,
130
+ PickerType :: BottomLevel => {
131
+ assert_eq ! ( picker_info. select_level + 1 , picker_info. target_level) ;
132
+ Box :: new ( MinOverlappingPicker :: new (
133
+ picker_info. select_level ,
134
+ picker_info. target_level ,
135
+ self . config . max_bytes_for_level_base ,
136
+ self . config . split_by_state_table ,
137
+ overlap_strategy,
111
138
) )
112
139
}
113
- } else {
114
- assert_eq ! ( select_level + 1 , target_level) ;
115
- Box :: new ( MinOverlappingPicker :: new (
116
- select_level,
117
- target_level,
118
- self . config . max_bytes_for_level_base ,
119
- self . config . split_by_state_table ,
120
- overlap_strategy,
121
- ) )
122
140
}
123
141
}
124
142
@@ -217,8 +235,12 @@ impl DynamicLevelSelectorCore {
217
235
std:: cmp:: min ( idle_file_count, overlapping_file_count) as u64 * SCORE_BASE
218
236
/ self . config . level0_tier_compact_file_number ;
219
237
// Reduce the level num of l0 overlapping sub_level
220
- ctx. score_levels
221
- . push ( ( std:: cmp:: max ( l0_overlapping_score, SCORE_BASE + 1 ) , 0 , 0 ) ) ;
238
+ ctx. score_levels . push ( PickerInfo {
239
+ score : std:: cmp:: max ( l0_overlapping_score, SCORE_BASE + 1 ) ,
240
+ select_level : 0 ,
241
+ target_level : 0 ,
242
+ picker_type : PickerType :: Tier ,
243
+ } )
222
244
}
223
245
224
246
// The read query at the non-overlapping level only selects ssts that match the query
@@ -254,8 +276,24 @@ impl DynamicLevelSelectorCore {
254
276
} ;
255
277
256
278
// Reduce the level num of l0 non-overlapping sub_level
257
- ctx. score_levels
258
- . push ( ( non_overlapping_score, 0 , ctx. base_level ) ) ;
279
+ ctx. score_levels . push ( {
280
+ PickerInfo {
281
+ score : non_overlapping_score,
282
+ select_level : 0 ,
283
+ target_level : ctx. base_level ,
284
+ picker_type : PickerType :: ToBase ,
285
+ }
286
+ } ) ;
287
+
288
+ // FIXME: more accurate score calculation algorithm will be introduced (#11903)
289
+ ctx. score_levels . push ( {
290
+ PickerInfo {
291
+ score : non_overlapping_score,
292
+ select_level : 0 ,
293
+ target_level : 0 ,
294
+ picker_type : PickerType :: Intra ,
295
+ }
296
+ } ) ;
259
297
}
260
298
261
299
// The bottommost level can not be input level.
@@ -275,16 +313,23 @@ impl DynamicLevelSelectorCore {
275
313
if total_size == 0 {
276
314
continue ;
277
315
}
278
- ctx. score_levels . push ( (
279
- total_size * SCORE_BASE / ctx. level_max_bytes [ level_idx] ,
280
- level_idx,
281
- level_idx + 1 ,
282
- ) ) ;
316
+
317
+ ctx. score_levels . push ( {
318
+ PickerInfo {
319
+ score : total_size * SCORE_BASE / ctx. level_max_bytes [ level_idx] ,
320
+ select_level : level_idx,
321
+ target_level : level_idx + 1 ,
322
+ picker_type : PickerType :: BottomLevel ,
323
+ }
324
+ } ) ;
283
325
}
284
326
285
327
// sort reverse to pick the largest one.
286
- ctx. score_levels
287
- . sort_by ( |a, b| b. 0 . cmp ( & a. 0 ) . then_with ( || a. 2 . cmp ( & b. 2 ) ) ) ;
328
+ ctx. score_levels . sort_by ( |a, b| {
329
+ b. score
330
+ . cmp ( & a. score )
331
+ . then_with ( || a. target_level . cmp ( & b. target_level ) )
332
+ } ) ;
288
333
ctx
289
334
}
290
335
@@ -379,21 +424,20 @@ impl LevelSelector for DynamicLevelSelector {
379
424
let overlap_strategy =
380
425
create_overlap_strategy ( compaction_group. compaction_config . compaction_mode ( ) ) ;
381
426
let ctx = dynamic_level_core. get_priority_levels ( levels, level_handlers) ;
382
-
383
427
// TODO: Determine which rule to enable by write limit
384
428
let compaction_task_validator = Arc :: new ( CompactionTaskValidator :: new (
385
429
compaction_group. compaction_config . clone ( ) ,
386
430
) ) ;
387
- for ( score , select_level , target_level ) in ctx. score_levels {
388
- if score <= SCORE_BASE {
431
+ for picker_info in & ctx. score_levels {
432
+ if picker_info . score <= SCORE_BASE {
389
433
return None ;
390
434
}
391
435
let mut picker = dynamic_level_core. create_compaction_picker (
392
- select_level,
393
- target_level,
436
+ picker_info,
394
437
overlap_strategy. clone ( ) ,
395
438
compaction_task_validator. clone ( ) ,
396
439
) ;
440
+
397
441
let mut stats = LocalPickerStatistic :: default ( ) ;
398
442
if let Some ( ret) = picker. pick_compaction ( levels, level_handlers, & mut stats) {
399
443
ret. add_pending_task ( task_id, level_handlers) ;
@@ -404,9 +448,11 @@ impl LevelSelector for DynamicLevelSelector {
404
448
self . task_type ( ) ,
405
449
) ) ;
406
450
}
407
- selector_stats
408
- . skip_picker
409
- . push ( ( select_level, target_level, stats) ) ;
451
+ selector_stats. skip_picker . push ( (
452
+ picker_info. select_level ,
453
+ picker_info. target_level ,
454
+ stats,
455
+ ) ) ;
410
456
}
411
457
None
412
458
}
0 commit comments