Skip to content

Commit 359fbf5

Browse files
feat(meta): support move state-table between compaction-group (risingwavelabs#8390)
Signed-off-by: Little-Wallace <[email protected]>
1 parent 52caa65 commit 359fbf5

File tree

12 files changed

+742
-368
lines changed

12 files changed

+742
-368
lines changed

proto/hummock.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ message GroupMetaChange {
7171
repeated uint32 table_ids_remove = 2;
7272
}
7373

74+
message GroupTableChange {
75+
repeated uint32 table_ids = 1;
76+
uint64 target_group_id = 2;
77+
uint64 origin_group_id = 3;
78+
uint64 new_sst_start_id = 4;
79+
}
80+
7481
message GroupDestroy {}
7582

7683
message GroupDelta {
@@ -79,6 +86,7 @@ message GroupDelta {
7986
GroupConstruct group_construct = 2;
8087
GroupDestroy group_destroy = 3;
8188
GroupMetaChange group_meta_change = 4;
89+
GroupTableChange group_table_change = 5;
8290
}
8391
}
8492

src/meta/src/hummock/manager/compaction_group_manager.rs

Lines changed: 157 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
2626
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
2727
use risingwave_hummock_sdk::CompactionGroupId;
2828
use risingwave_pb::hummock::group_delta::DeltaType;
29+
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
2930
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
3031
use risingwave_pb::hummock::{
31-
CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy,
32-
GroupMetaChange,
32+
compact_task, CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy,
33+
GroupMetaChange, GroupTableChange,
3334
};
3435
use tokio::sync::{OnceCell, RwLock};
3536

@@ -61,7 +62,7 @@ impl<S: MetaStore> HummockManager<S> {
6162
) -> Result<RwLock<CompactionGroupManager>> {
6263
let compaction_group_manager = RwLock::new(CompactionGroupManager {
6364
compaction_groups: BTreeMap::new(),
64-
provided_default_config_for_test: config,
65+
default_config: config,
6566
});
6667
compaction_group_manager
6768
.write()
@@ -421,11 +422,24 @@ impl<S: MetaStore> HummockManager<S> {
421422

422423
/// Splits a compaction group into two. The new one will contain `table_ids`.
423424
/// Returns the newly created compaction group id.
424-
#[named]
425425
pub async fn split_compaction_group(
426426
&self,
427427
parent_group_id: CompactionGroupId,
428428
table_ids: &[StateTableId],
429+
) -> Result<CompactionGroupId> {
430+
self.move_state_table_to_compaction_group(parent_group_id, table_ids, None, false)
431+
.await
432+
}
433+
434+
/// move some table to another compaction-group. Create a new compaction group if it does not
435+
/// exist.
436+
#[named]
437+
pub async fn move_state_table_to_compaction_group(
438+
&self,
439+
parent_group_id: CompactionGroupId,
440+
table_ids: &[StateTableId],
441+
target_group_id: Option<CompactionGroupId>,
442+
allow_split_by_table: bool,
429443
) -> Result<CompactionGroupId> {
430444
if table_ids.is_empty() {
431445
return Ok(parent_group_id);
@@ -453,120 +467,176 @@ impl<S: MetaStore> HummockManager<S> {
453467
parent_group_id
454468
)));
455469
}
470+
if let Some(compaction_group_id) = target_group_id {
471+
if !versioning.check_branched_sst_in_target_group(
472+
&table_ids,
473+
&parent_group_id,
474+
&compaction_group_id,
475+
) {
476+
return Err(Error::CompactionGroup(format!(
477+
"invalid split attempt for group {}: we shall wait some time for parent group and target group could compact stale sst files",
478+
parent_group_id
479+
)));
480+
}
481+
}
456482

457483
let mut new_version_delta = BTreeMapEntryTransaction::new_insert(
458484
&mut versioning.hummock_version_deltas,
459485
current_version.id + 1,
460486
build_version_delta_after_version(current_version),
461487
);
462-
463-
// Remove tables from parent group.
464-
for table_id in &table_ids {
465-
let group_deltas = &mut new_version_delta
466-
.group_deltas
467-
.entry(parent_group_id)
468-
.or_default()
469-
.group_deltas;
470-
group_deltas.push(GroupDelta {
471-
delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange {
472-
table_ids_remove: vec![*table_id],
473-
..Default::default()
474-
})),
475-
});
476-
}
477-
478-
// Add tables to new group.
479-
let new_group_id = self
480-
.env
481-
.id_gen_manager()
482-
.generate::<{ IdCategory::CompactionGroup }>()
483-
.await?;
484488
let new_sst_start_id = self
485489
.env
486490
.id_gen_manager()
487491
.generate_interval::<{ IdCategory::HummockSstableId }>(
488-
versioning.current_version.count_new_ssts_in_group_split(
492+
current_version.count_new_ssts_in_group_split(
489493
parent_group_id,
490-
&HashSet::from_iter(table_ids.iter().cloned()),
494+
HashSet::from_iter(table_ids.clone()),
491495
),
492496
)
493497
.await?;
494-
let group_deltas = &mut new_version_delta
495-
.group_deltas
496-
.entry(new_group_id)
497-
.or_default()
498-
.group_deltas;
499-
let config = self
500-
.compaction_group_manager
501-
.read()
502-
.await
503-
.get_compaction_group_config(new_group_id)
504-
.compaction_config
505-
.as_ref()
506-
.clone();
507-
group_deltas.push(GroupDelta {
508-
delta_type: Some(DeltaType::GroupConstruct(GroupConstruct {
509-
group_config: Some(config),
510-
group_id: new_group_id,
511-
parent_group_id,
512-
table_ids,
513-
new_sst_start_id,
514-
})),
515-
});
498+
let mut new_group = None;
499+
let target_compaction_group_id = match target_group_id {
500+
Some(compaction_group_id) => {
501+
match current_version.levels.get(&compaction_group_id) {
502+
Some(group) => {
503+
for table_id in &table_ids {
504+
if group.member_table_ids.contains(table_id) {
505+
return Err(Error::CompactionGroup(format!(
506+
"table {} already exist in group {}",
507+
*table_id, compaction_group_id,
508+
)));
509+
}
510+
}
511+
}
512+
None => {
513+
return Err(Error::CompactionGroup(format!(
514+
"target group {} does not exist",
515+
compaction_group_id,
516+
)));
517+
}
518+
}
519+
let group_deltas = &mut new_version_delta
520+
.group_deltas
521+
.entry(compaction_group_id)
522+
.or_default()
523+
.group_deltas;
524+
group_deltas.push(GroupDelta {
525+
delta_type: Some(DeltaType::GroupTableChange(GroupTableChange {
526+
table_ids: table_ids.to_vec(),
527+
origin_group_id: parent_group_id,
528+
target_group_id: compaction_group_id,
529+
new_sst_start_id,
530+
})),
531+
});
532+
compaction_group_id
533+
}
534+
None => {
535+
// All NewCompactionGroup pairs are mapped to one new compaction group.
536+
let new_compaction_group_id = self
537+
.env
538+
.id_gen_manager()
539+
.generate::<{ IdCategory::CompactionGroup }>()
540+
.await?;
541+
let mut config = self
542+
.compaction_group_manager
543+
.read()
544+
.await
545+
.get_default_compaction_group_config();
546+
config.split_by_state_table = allow_split_by_table;
547+
548+
new_version_delta.group_deltas.insert(
549+
new_compaction_group_id,
550+
GroupDeltas {
551+
group_deltas: vec![GroupDelta {
552+
delta_type: Some(DeltaType::GroupConstruct(GroupConstruct {
553+
group_config: Some(config.clone()),
554+
group_id: new_compaction_group_id,
555+
parent_group_id,
556+
new_sst_start_id,
557+
table_ids: table_ids.to_vec(),
558+
})),
559+
}],
560+
},
561+
);
516562

563+
new_group = Some((new_compaction_group_id, config));
564+
new_version_delta.group_deltas.insert(
565+
parent_group_id,
566+
GroupDeltas {
567+
group_deltas: vec![GroupDelta {
568+
delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange {
569+
table_ids_remove: table_ids.to_vec(),
570+
..Default::default()
571+
})),
572+
}],
573+
},
574+
);
575+
new_compaction_group_id
576+
}
577+
};
517578
let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts);
518579
let mut trx = Transaction::default();
519580
new_version_delta.apply_to_txn(&mut trx)?;
520-
self.env.meta_store().txn(trx).await?;
581+
if let Some((new_compaction_group_id, config)) = new_group {
582+
let mut compaction_group_manager = self.compaction_group_manager.write().await;
583+
let insert = BTreeMapEntryTransaction::new_insert(
584+
&mut compaction_group_manager.compaction_groups,
585+
new_compaction_group_id,
586+
CompactionGroup {
587+
group_id: new_compaction_group_id,
588+
compaction_config: Arc::new(config),
589+
},
590+
);
591+
insert.apply_to_txn(&mut trx)?;
592+
self.env.meta_store().txn(trx).await?;
593+
insert.commit();
594+
} else {
595+
self.env.meta_store().txn(trx).await?;
596+
}
521597
let sst_split_info = versioning
522598
.current_version
523599
.apply_version_delta(&new_version_delta);
524600
// Updates SST split info
525-
for (object_id, sst_id, parent_old_sst_id, parent_new_sst_id) in sst_split_info {
601+
for (object_id, sst_id, _parent_old_sst_id, parent_new_sst_id) in sst_split_info {
526602
match branched_ssts.get_mut(object_id) {
527603
Some(mut entry) => {
528-
let p = entry.get_mut(&parent_group_id).unwrap();
529-
let parent_pos = p.iter().position(|id| *id == parent_old_sst_id).unwrap();
530604
if let Some(parent_new_sst_id) = parent_new_sst_id {
531-
p[parent_pos] = parent_new_sst_id;
605+
entry.insert(parent_group_id, parent_new_sst_id);
532606
} else {
533-
p.remove(parent_pos);
534-
if p.is_empty() {
535-
entry.remove(&parent_group_id);
536-
}
607+
entry.remove(&parent_group_id);
537608
}
538-
entry.entry(new_group_id).or_default().push(sst_id);
609+
entry.insert(target_compaction_group_id, sst_id);
539610
}
540611
None => {
541-
branched_ssts.insert(
542-
object_id,
543-
if let Some(parent_new_sst_id) = parent_new_sst_id {
544-
[
545-
(parent_group_id, vec![parent_new_sst_id]),
546-
(new_group_id, vec![sst_id]),
547-
]
548-
.into_iter()
549-
.collect()
550-
} else {
551-
[(new_group_id, vec![sst_id])].into_iter().collect()
552-
},
553-
);
612+
let mut groups = HashMap::from_iter([(target_compaction_group_id, sst_id)]);
613+
if let Some(parent_new_sst_id) = parent_new_sst_id {
614+
groups.insert(parent_group_id, parent_new_sst_id);
615+
}
616+
branched_ssts.insert(object_id, groups);
554617
}
555618
}
556619
}
557620
new_version_delta.commit();
558621
branched_ssts.commit_memory();
559622
self.notify_last_version_delta(versioning);
560-
561-
Ok(new_group_id)
623+
// Don't trigger compactions if we enable deterministic compaction
624+
if !self.env.opts.compaction_deterministic_test {
625+
// commit_epoch may contains SSTs from any compaction group
626+
self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim);
627+
self.try_send_compaction_request(
628+
target_compaction_group_id,
629+
compact_task::TaskType::SpaceReclaim,
630+
);
631+
}
632+
Ok(target_compaction_group_id)
562633
}
563634
}
564635

565636
#[derive(Default)]
566637
pub(super) struct CompactionGroupManager {
567638
compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
568-
/// Provided default config, only used in test.
569-
provided_default_config_for_test: CompactionConfig,
639+
default_config: CompactionConfig,
570640
}
571641

572642
impl CompactionGroupManager {
@@ -602,14 +672,20 @@ impl CompactionGroupManager {
602672
compaction_group_ids
603673
.iter()
604674
.map(|id| {
605-
let group = self.compaction_groups.get(id).cloned().unwrap_or_else(|| {
606-
CompactionGroup::new(*id, self.provided_default_config_for_test.clone())
607-
});
675+
let group = self
676+
.compaction_groups
677+
.get(id)
678+
.cloned()
679+
.unwrap_or_else(|| CompactionGroup::new(*id, self.default_config.clone()));
608680
(*id, group)
609681
})
610682
.collect()
611683
}
612684

685+
fn get_default_compaction_group_config(&self) -> CompactionConfig {
686+
self.default_config.clone()
687+
}
688+
613689
async fn update_compaction_config<S: MetaStore>(
614690
&mut self,
615691
compaction_group_ids: &[CompactionGroupId],
@@ -621,10 +697,7 @@ impl CompactionGroupManager {
621697
if !compaction_groups.contains_key(compaction_group_id) {
622698
compaction_groups.insert(
623699
*compaction_group_id,
624-
CompactionGroup::new(
625-
*compaction_group_id,
626-
self.provided_default_config_for_test.clone(),
627-
),
700+
CompactionGroup::new(*compaction_group_id, self.default_config.clone()),
628701
);
629702
}
630703
let group = compaction_groups.get(compaction_group_id).unwrap();

0 commit comments

Comments
 (0)