Skip to content

fix(storage): replace imms with merged imms in any position of staging #14756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,12 @@ impl UploadingTask {
Ok(task_result) => task_result
.inspect(|_| {
if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
info!("upload task finish {:?}", self.task_info)
info!(task_info = ?self.task_info, "upload task finish");
} else {
debug!("upload task finish {:?}", self.task_info)
debug!(task_info = ?self.task_info, "upload task finish");
}
})
.inspect_err(|e| error!(task_info = ?self.task_info, ?e, "upload task failed"))
.map(|ssts| {
StagingSstableInfo::new(
ssts,
Expand Down Expand Up @@ -391,6 +392,7 @@ struct SealedData {
merged_imms: VecDeque<ImmutableMemtable>,

// Sealed imms grouped by table shard.
// newer data (larger imm id) at the front
imms_by_table_shard: HashMap<(TableId, LocalInstanceId), VecDeque<ImmutableMemtable>>,

// Merging tasks generated from sealed imms
Expand Down Expand Up @@ -454,10 +456,14 @@ impl SealedData {

// rearrange sealed imms by table shard and in epoch descending order
for imm in unseal_epoch_data.imms.into_iter().rev() {
self.imms_by_table_shard
let queue = self
.imms_by_table_shard
.entry((imm.table_id, imm.instance_id))
.or_default()
.push_front(imm);
.or_default();
if let Some(front) = queue.front() {
assert_gt!(imm.batch_id(), front.batch_id());
}
queue.push_front(imm);
}

self.epochs.push_front(epoch);
Expand Down
5 changes: 4 additions & 1 deletion src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ struct SharedBufferDeleteRangeMeta {
pub(crate) struct SharedBufferBatchInner {
payload: Vec<SharedBufferVersionedEntry>,
/// The list of imm ids that are merged into this batch
/// This field is immutable
/// This field is immutable.
///
/// Larger imm id at the front.
imm_ids: Vec<ImmId>,
/// The epochs of the data in batch, sorted in ascending order (old to new)
epochs: Vec<HummockEpoch>,
Expand Down Expand Up @@ -200,6 +202,7 @@ impl SharedBufferBatchInner {
tracker: Option<MemoryTracker>,
) -> Self {
debug_assert!(!imm_ids.is_empty());
debug_assert!(imm_ids.iter().rev().is_sorted());
debug_assert!(!epochs.is_empty());
debug_assert!(epochs.is_sorted());

Expand Down
133 changes: 55 additions & 78 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ pub struct StagingVersion {
// imm of smaller batch id may be added later than one with greater batch id
pub imm: VecDeque<ImmutableMemtable>,

// Separate queue for merged imm to ease the management of imm and merged imm.
// Newer merged imm comes first
pub merged_imm: VecDeque<ImmutableMemtable>,

// newer data comes first
pub sst: VecDeque<StagingSstableInfo>,
}
Expand All @@ -162,20 +158,16 @@ impl StagingVersion {
let (ref left, ref right) = table_key_range;
let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
let overlapped_imms = self
.imm
.iter()
.chain(self.merged_imm.iter())
.filter(move |imm| {
// retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
imm.min_epoch() <= max_epoch_inclusive
&& imm.table_id == table_id
&& range_overlap(
&(left, right),
&imm.start_table_key(),
imm.end_table_key().as_ref(),
)
});
let overlapped_imms = self.imm.iter().filter(move |imm| {
// retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
imm.min_epoch() <= max_epoch_inclusive
&& imm.table_id == table_id
&& range_overlap(
&(left, right),
&imm.start_table_key(),
imm.end_table_key().as_ref(),
)
});

// TODO: Remove duplicate sst based on sst id
let overlapped_ssts = self
Expand Down Expand Up @@ -238,7 +230,6 @@ impl HummockReadVersion {
.cloned(),
staging: StagingVersion {
imm: VecDeque::default(),
merged_imm: Default::default(),
sst: VecDeque::default(),
},

Expand Down Expand Up @@ -275,10 +266,6 @@ impl HummockReadVersion {
self.staging.imm.push_front(imm)
}
StagingData::MergedImmMem(merged_imm) => {
if let Some(item) = self.staging.merged_imm.front() {
// check batch_id order from newest to old
debug_assert!(item.batch_id() < merged_imm.batch_id());
}
self.add_merged_imm(merged_imm);
}
StagingData::Sst(staging_sst) => {
Expand All @@ -296,18 +283,12 @@ impl HummockReadVersion {
.staging
.imm
.iter()
.chain(self.staging.merged_imm.iter())
.rev()
.is_sorted_by_key(|imm| imm.batch_id()));

// Calculate intersection
let staging_imm_ids_from_imms: HashSet<u64> = self
.staging
.imm
.iter()
.chain(self.staging.merged_imm.iter())
.map(|imm| imm.batch_id())
.collect();
let staging_imm_ids_from_imms: HashSet<u64> =
self.staging.imm.iter().map(|imm| imm.batch_id()).collect();

// intersected batch_id order from oldest to newest
let intersect_imm_ids = staging_sst
Expand All @@ -322,21 +303,12 @@ impl HummockReadVersion {
// Check 2)
debug_assert!(check_subset_preserve_order(
intersect_imm_ids.iter().copied(),
self.staging
.imm
.iter()
.chain(self.staging.merged_imm.iter())
.map(|imm| imm.batch_id())
.rev(),
self.staging.imm.iter().map(|imm| imm.batch_id()).rev(),
));

// Check 3) and replace imms with a staging sst
for imm_id in &intersect_imm_ids {
if let Some(merged_imm) = self.staging.merged_imm.back() {
if *imm_id == merged_imm.batch_id() {
self.staging.merged_imm.pop_back();
}
} else if let Some(imm) = self.staging.imm.back() {
if let Some(imm) = self.staging.imm.back() {
if *imm_id == imm.batch_id() {
self.staging.imm.pop_back();
}
Expand All @@ -348,24 +320,16 @@ impl HummockReadVersion {
.map(|imm| imm.batch_id())
.collect_vec();

let merged_imm_ids = self
.staging
.merged_imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec();
unreachable!(
"should not reach here staging_sst.size {},
staging_sst.imm_ids {:?},
staging_sst.epochs {:?},
local_imm_ids {:?},
merged_imm_ids {:?},
intersect_imm_ids {:?}",
staging_sst.imm_size,
staging_sst.imm_ids,
staging_sst.epochs,
local_imm_ids,
merged_imm_ids,
intersect_imm_ids,
);
}
Expand All @@ -389,10 +353,6 @@ impl HummockReadVersion {
.imm
.retain(|imm| imm.min_epoch() > max_committed_epoch);

self.staging
.merged_imm
.retain(|merged_imm| merged_imm.min_epoch() > max_committed_epoch);

self.staging.sst.retain(|sst| {
sst.epochs.first().expect("epochs not empty") > &max_committed_epoch
});
Expand Down Expand Up @@ -446,7 +406,6 @@ impl HummockReadVersion {

pub fn clear_uncommitted(&mut self) {
self.staging.imm.clear();
self.staging.merged_imm.clear();
self.staging.sst.clear();
self.table_watermarks = self
.committed
Expand All @@ -456,32 +415,50 @@ impl HummockReadVersion {
}

pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable) {
let staging_imm_count = self.staging.imm.len();
let merged_imm_ids = merged_imm.get_imm_ids();

#[cfg(debug_assertions)]
{
// check the suffix `merged_imm_ids.len()` imms in staging.imm are the same as
// `merged_imm_ids`
let diff = staging_imm_count - merged_imm_ids.len();
let mut count: usize = 0;
for (i, imm) in self.staging.imm.iter().skip(diff).enumerate() {
count += 1;
assert_eq!(
imm.batch_id(),
merged_imm_ids[i],
"merged_imm_ids: {:?}",
merged_imm_ids
);
assert!(merged_imm.get_imm_ids().iter().rev().is_sorted());
let min_imm_id = *merged_imm.get_imm_ids().last().expect("non-empty");

let back = self.staging.imm.back().expect("should not be empty");

// pop and save imms that are written earlier than the oldest imm if there is any
let earlier_imms = if back.batch_id() < min_imm_id {
let mut earlier_imms = VecDeque::with_capacity(self.staging.imm.len());
loop {
let batch_id = self
.staging
.imm
.back()
.expect("should not be empty")
.batch_id();
match batch_id.cmp(&min_imm_id) {
Ordering::Less => {
let imm = self.staging.imm.pop_back().unwrap();
earlier_imms.push_front(imm);
}
Ordering::Equal => {
break;
}
Ordering::Greater => {
unreachable!("must have break in equal")
}
}
}
assert_eq!(count, merged_imm_ids.len());
Some(earlier_imms)
} else {
assert_eq!(back.batch_id(), min_imm_id);
None
};

// iter from smaller imm and take the older imm at the back.
for imm_id in merged_imm.get_imm_ids().iter().rev() {
let imm = self.staging.imm.pop_back().expect("should exist");
assert_eq!(imm.batch_id(), *imm_id);
}
self.staging
.imm
.truncate(staging_imm_count - merged_imm_ids.len());

// add the newly merged imm into front
self.staging.merged_imm.push_front(merged_imm);
self.staging.imm.push_back(merged_imm);
if let Some(earlier_imms) = earlier_imms {
self.staging.imm.extend(earlier_imms);
}
}

pub fn is_replicated(&self) -> bool {
Expand Down