Skip to content

Commit 8dfae83

Browse files
authored
fix(storage): Remove ambiguous configuration max_sub_compaction (#16960)
1 parent 4e64389 commit 8dfae83

File tree

7 files changed

+23
-25
lines changed

7 files changed

+23
-25
lines changed

proto/hummock.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,8 @@ message CompactTask {
382382
map<uint32, TableWatermarks> table_watermarks = 24;
383383
// The table schemas that are at least as new as the one used to create `input_ssts`.
384384
map<uint32, TableSchema> table_schemas = 25;
385+
// Max sub compaction task numbers
386+
uint32 max_sub_compaction = 26;
385387
}
386388

387389
message LevelHandler {

src/common/src/config.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -721,10 +721,6 @@ pub struct StorageConfig {
721721
#[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
722722
pub min_sst_size_for_streaming_upload: u64,
723723

724-
/// Max sub compaction task numbers
725-
#[serde(default = "default::storage::max_sub_compaction")]
726-
pub max_sub_compaction: u32,
727-
728724
#[serde(default = "default::storage::max_concurrent_compaction_task_number")]
729725
pub max_concurrent_compaction_task_number: u64,
730726

@@ -1461,10 +1457,6 @@ pub mod default {
14611457
32 * 1024 * 1024
14621458
}
14631459

1464-
pub fn max_sub_compaction() -> u32 {
1465-
4
1466-
}
1467-
14681460
pub fn max_concurrent_compaction_task_number() -> u64 {
14691461
16
14701462
}

src/config/docs.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ This page is automatically generated by `./risedev generate-example-config`
121121
| max_prefetch_block_number | max prefetch block number | 16 |
122122
| max_preload_io_retry_times | | 3 |
123123
| max_preload_wait_time_mill | | 0 |
124-
| max_sub_compaction | Max sub compaction task numbers | 4 |
125124
| max_version_pinning_duration_sec | | 10800 |
126125
| mem_table_spill_threshold | The spill threshold for mem table. | 4194304 |
127126
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |

src/config/example.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ compactor_memory_available_proportion = 0.8
136136
sstable_id_remote_fetch_number = 10
137137
min_sstable_size_mb = 32
138138
min_sst_size_for_streaming_upload = 33554432
139-
max_sub_compaction = 4
140139
max_concurrent_compaction_task_number = 16
141140
max_preload_wait_time_mill = 0
142141
max_version_pinning_duration_sec = 10800

src/meta/src/hummock/manager/compaction.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,7 @@ impl HummockManager {
774774
target_sub_level_id: compact_task.input.target_sub_level_id,
775775
task_type: compact_task.compaction_task_type as i32,
776776
split_weight_by_vnode: vnode_partition_count,
777+
max_sub_compaction: group_config.compaction_config.max_sub_compaction,
777778
..Default::default()
778779
};
779780

src/storage/src/hummock/compactor/compaction_utils.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ use risingwave_hummock_sdk::table_stats::TableStatsMap;
2828
use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator};
2929
use risingwave_pb::hummock::compact_task::TaskType;
3030
use risingwave_pb::hummock::{
31-
compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo,
32-
TableSchema,
31+
compact_task, BloomFilterType, CompactTask, LevelType, PbKeyRange, SstableInfo, TableSchema,
3332
};
3433
use tokio::time::Instant;
3534

@@ -178,15 +177,16 @@ fn generate_splits_fast(
178177
sstable_infos: &Vec<SstableInfo>,
179178
compaction_size: u64,
180179
context: &CompactorContext,
181-
) -> Vec<KeyRange_vec> {
180+
max_sub_compaction: u32,
181+
) -> Vec<PbKeyRange> {
182182
let worker_num = context.compaction_executor.worker_num();
183183
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
184184

185185
let parallelism = calculate_task_parallelism_impl(
186186
worker_num,
187187
parallel_compact_size,
188188
compaction_size,
189-
context.storage_opts.max_sub_compaction,
189+
max_sub_compaction,
190190
);
191191
let mut indexes = vec![];
192192
for sst in sstable_infos {
@@ -213,13 +213,13 @@ fn generate_splits_fast(
213213
}
214214

215215
let mut splits = vec![];
216-
splits.push(KeyRange_vec::new(vec![], vec![]));
216+
splits.push(PbKeyRange::new(vec![], vec![]));
217217
let parallel_key_count = indexes.len() / parallelism;
218218
let mut last_split_key_count = 0;
219219
for key in indexes {
220220
if last_split_key_count >= parallel_key_count {
221221
splits.last_mut().unwrap().right.clone_from(&key);
222-
splits.push(KeyRange_vec::new(key.clone(), vec![]));
222+
splits.push(PbKeyRange::new(key.clone(), vec![]));
223223
last_split_key_count = 0;
224224
}
225225
last_split_key_count += 1;
@@ -232,7 +232,8 @@ pub async fn generate_splits(
232232
sstable_infos: &Vec<SstableInfo>,
233233
compaction_size: u64,
234234
context: &CompactorContext,
235-
) -> HummockResult<Vec<KeyRange_vec>> {
235+
max_sub_compaction: u32,
236+
) -> HummockResult<Vec<PbKeyRange>> {
236237
const MAX_FILE_COUNT: usize = 32;
237238
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
238239
if compaction_size > parallel_compact_size {
@@ -241,6 +242,7 @@ pub async fn generate_splits(
241242
sstable_infos,
242243
compaction_size,
243244
context,
245+
max_sub_compaction,
244246
));
245247
}
246248
let mut indexes = vec![];
@@ -269,13 +271,13 @@ pub async fn generate_splits(
269271
// sort by key, as for every data block has the same size;
270272
indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
271273
let mut splits = vec![];
272-
splits.push(KeyRange_vec::new(vec![], vec![]));
274+
splits.push(PbKeyRange::new(vec![], vec![]));
273275

274276
let parallelism = calculate_task_parallelism_impl(
275277
context.compaction_executor.worker_num(),
276278
parallel_compact_size,
277279
compaction_size,
278-
context.storage_opts.max_sub_compaction,
280+
max_sub_compaction,
279281
);
280282

281283
let sub_compaction_data_size =
@@ -291,7 +293,7 @@ pub async fn generate_splits(
291293
&& remaining_size > parallel_compact_size
292294
{
293295
splits.last_mut().unwrap().right.clone_from(&key);
294-
splits.push(KeyRange_vec::new(key.clone(), vec![]));
296+
splits.push(PbKeyRange::new(key.clone(), vec![]));
295297
last_buffer_size = data_size;
296298
} else {
297299
last_buffer_size += data_size;
@@ -577,7 +579,13 @@ pub async fn generate_splits_for_task(
577579
.sum::<u64>();
578580

579581
if !optimize_by_copy_block {
580-
let splits = generate_splits(&sstable_infos, compaction_size, context).await?;
582+
let splits = generate_splits(
583+
&sstable_infos,
584+
compaction_size,
585+
context,
586+
compact_task.get_max_sub_compaction(),
587+
)
588+
.await?;
581589
if !splits.is_empty() {
582590
compact_task.splits = splits;
583591
}
@@ -659,7 +667,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto
659667
context.compaction_executor.worker_num(),
660668
parallel_compact_size,
661669
compaction_size,
662-
context.storage_opts.max_sub_compaction,
670+
compact_task.get_max_sub_compaction(),
663671
)
664672
}
665673

src/storage/src/opts.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ pub struct StorageOpts {
7474
pub sstable_id_remote_fetch_number: u32,
7575
/// Whether to enable streaming upload for sstable.
7676
pub min_sst_size_for_streaming_upload: u64,
77-
/// Max sub compaction task numbers
78-
pub max_sub_compaction: u32,
7977
pub max_concurrent_compaction_task_number: u64,
8078
pub max_version_pinning_duration_sec: u64,
8179
pub compactor_iter_max_io_retry_times: usize,
@@ -176,7 +174,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
176174
compactor_memory_limit_mb: s.compactor_memory_limit_mb,
177175
sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number,
178176
min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload,
179-
max_sub_compaction: c.storage.max_sub_compaction,
180177
max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number,
181178
max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec,
182179
data_file_cache_dir: c.storage.data_file_cache.dir.clone(),

0 commit comments

Comments
 (0)