Skip to content

Commit c8030fc

Browse files
authored
fix(storage): replace imms with merged imms in any position of staging (#14756)
1 parent 2559b23 commit c8030fc

File tree

6 files changed

+427
-136
lines changed

6 files changed

+427
-136
lines changed

src/storage/src/hummock/event_handler/hummock_event_handler.rs

Lines changed: 269 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use crate::hummock::compactor::{compact, CompactorContext};
3434
use crate::hummock::conflict_detector::ConflictDetector;
3535
use crate::hummock::event_handler::refiller::CacheRefillerEvent;
3636
use crate::hummock::event_handler::uploader::{
37-
HummockUploader, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent,
37+
default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData,
38+
UploadTaskInfo, UploadTaskPayload, UploaderEvent,
3839
};
3940
use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate};
4041
use crate::hummock::local_version::pinned_version::PinnedVersion;
@@ -43,7 +44,7 @@ use crate::hummock::store::version::{
4344
};
4445
use crate::hummock::utils::validate_table_key_range;
4546
use crate::hummock::{
46-
HummockError, HummockResult, MemoryLimiter, SstableObjectIdManager, TrackerId,
47+
HummockError, HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef, TrackerId,
4748
};
4849
use crate::monitor::HummockStateStoreMetrics;
4950
use crate::opts::StorageOpts;
@@ -126,7 +127,7 @@ pub struct HummockEventHandler {
126127

127128
last_instance_id: LocalInstanceId,
128129

129-
sstable_object_id_manager: Arc<SstableObjectIdManager>,
130+
sstable_object_id_manager: Option<Arc<SstableObjectIdManager>>,
130131
}
131132

132133
async fn flush_imms(
@@ -164,24 +165,17 @@ impl HummockEventHandler {
164165
filter_key_extractor_manager: FilterKeyExtractorManager,
165166
sstable_object_id_manager: Arc<SstableObjectIdManager>,
166167
state_store_metrics: Arc<HummockStateStoreMetrics>,
167-
cache_refill_config: CacheRefillConfig,
168168
) -> Self {
169-
let (version_update_notifier_tx, _) =
170-
tokio::sync::watch::channel(pinned_version.max_committed_epoch());
171-
let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
172-
let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
173-
let buffer_tracker = BufferTracker::from_storage_opts(
174-
&compactor_context.storage_opts,
175-
state_store_metrics.uploader_uploading_task_size.clone(),
176-
);
177-
let write_conflict_detector =
178-
ConflictDetector::new_from_config(&compactor_context.storage_opts);
179-
let sstable_store = compactor_context.sstable_store.clone();
180169
let upload_compactor_context = compactor_context.clone();
181170
let cloned_sstable_object_id_manager = sstable_object_id_manager.clone();
182-
let uploader = HummockUploader::new(
171+
Self::new_inner(
172+
hummock_event_tx,
173+
hummock_event_rx,
174+
pinned_version,
175+
Some(sstable_object_id_manager),
176+
compactor_context.sstable_store.clone(),
183177
state_store_metrics,
184-
pinned_version.clone(),
178+
&compactor_context.storage_opts,
185179
Arc::new(move |payload, task_info| {
186180
spawn(flush_imms(
187181
payload,
@@ -191,11 +185,43 @@ impl HummockEventHandler {
191185
cloned_sstable_object_id_manager.clone(),
192186
))
193187
}),
188+
default_spawn_merging_task(compactor_context.compaction_executor.clone()),
189+
)
190+
}
191+
192+
fn new_inner(
193+
hummock_event_tx: mpsc::UnboundedSender<HummockEvent>,
194+
hummock_event_rx: mpsc::UnboundedReceiver<HummockEvent>,
195+
pinned_version: PinnedVersion,
196+
sstable_object_id_manager: Option<Arc<SstableObjectIdManager>>,
197+
sstable_store: SstableStoreRef,
198+
state_store_metrics: Arc<HummockStateStoreMetrics>,
199+
storage_opts: &StorageOpts,
200+
spawn_upload_task: SpawnUploadTask,
201+
spawn_merging_task: SpawnMergingTask,
202+
) -> Self {
203+
let (version_update_notifier_tx, _) =
204+
tokio::sync::watch::channel(pinned_version.max_committed_epoch());
205+
let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
206+
let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
207+
let buffer_tracker = BufferTracker::from_storage_opts(
208+
storage_opts,
209+
state_store_metrics.uploader_uploading_task_size.clone(),
210+
);
211+
let write_conflict_detector = ConflictDetector::new_from_config(storage_opts);
212+
213+
let uploader = HummockUploader::new(
214+
state_store_metrics,
215+
pinned_version.clone(),
216+
spawn_upload_task,
217+
spawn_merging_task,
194218
buffer_tracker,
195-
&compactor_context.storage_opts,
196-
compactor_context.compaction_executor.clone(),
219+
storage_opts,
220+
);
221+
let refiller = CacheRefiller::new(
222+
CacheRefillConfig::from_storage_opts(storage_opts),
223+
sstable_store,
197224
);
198-
let refiller = CacheRefiller::new(cache_refill_config, sstable_store);
199225

200226
Self {
201227
hummock_event_tx,
@@ -393,8 +419,10 @@ impl HummockEventHandler {
393419
});
394420
}
395421

396-
self.sstable_object_id_manager
397-
.remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX));
422+
if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
423+
sstable_object_id_manager
424+
.remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX));
425+
}
398426

399427
// Notify completion of the Clear event.
400428
let _ = notifier.send(()).inspect_err(|e| {
@@ -466,10 +494,12 @@ impl HummockEventHandler {
466494
if let Some(conflict_detector) = self.write_conflict_detector.as_ref() {
467495
conflict_detector.set_watermark(max_committed_epoch);
468496
}
469-
self.sstable_object_id_manager
470-
.remove_watermark_object_id(TrackerId::Epoch(
497+
498+
if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
499+
sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch(
471500
self.pinned_version.load().max_committed_epoch(),
472501
));
502+
}
473503

474504
debug!(
475505
"update to hummock version: {}, epoch: {}",
@@ -714,3 +744,218 @@ fn to_sync_result(result: &HummockResult<SyncedData>) -> HummockResult<SyncResul
714744
))),
715745
}
716746
}
747+
748+
#[cfg(test)]
749+
mod tests {
750+
use std::future::poll_fn;
751+
use std::iter::once;
752+
use std::sync::Arc;
753+
use std::task::Poll;
754+
755+
use bytes::Bytes;
756+
use futures::FutureExt;
757+
use itertools::Itertools;
758+
use risingwave_common::catalog::TableId;
759+
use risingwave_common::util::iter_util::ZipEqDebug;
760+
use risingwave_hummock_sdk::key::TableKey;
761+
use risingwave_hummock_sdk::version::HummockVersion;
762+
use risingwave_pb::hummock::PbHummockVersion;
763+
use tokio::spawn;
764+
use tokio::sync::mpsc::unbounded_channel;
765+
use tokio::sync::oneshot;
766+
use tokio::task::yield_now;
767+
768+
use crate::hummock::event_handler::{HummockEvent, HummockEventHandler};
769+
use crate::hummock::iterator::test_utils::mock_sstable_store;
770+
use crate::hummock::local_version::pinned_version::PinnedVersion;
771+
use crate::hummock::shared_buffer::shared_buffer_batch::{
772+
SharedBufferBatch, SharedBufferBatchInner,
773+
};
774+
use crate::hummock::store::version::{StagingData, VersionUpdate};
775+
use crate::hummock::test_utils::default_opts_for_test;
776+
use crate::hummock::value::HummockValue;
777+
use crate::hummock::HummockError;
778+
use crate::monitor::HummockStateStoreMetrics;
779+
780+
#[tokio::test]
781+
async fn test_event_handler() {
782+
let (tx, rx) = unbounded_channel();
783+
let table_id = TableId::new(123);
784+
let epoch0 = 233;
785+
let pinned_version = PinnedVersion::new(
786+
HummockVersion::from_rpc_protobuf(&PbHummockVersion {
787+
id: 1,
788+
max_committed_epoch: epoch0,
789+
..Default::default()
790+
}),
791+
unbounded_channel().0,
792+
);
793+
794+
let mut storage_opts = default_opts_for_test();
795+
storage_opts.imm_merge_threshold = 5;
796+
797+
let (spawn_upload_task_tx, mut spawn_upload_task_rx) = unbounded_channel();
798+
let (spawn_merging_task_tx, mut spawn_merging_task_rx) = unbounded_channel();
799+
let event_handler = HummockEventHandler::new_inner(
800+
tx.clone(),
801+
rx,
802+
pinned_version,
803+
None,
804+
mock_sstable_store(),
805+
Arc::new(HummockStateStoreMetrics::unused()),
806+
&storage_opts,
807+
Arc::new(move |_, _| {
808+
let (tx, rx) = oneshot::channel::<()>();
809+
spawn_upload_task_tx.send(tx).unwrap();
810+
spawn(async move {
811+
// wait for main thread to notify returning error
812+
rx.await.unwrap();
813+
Err(HummockError::other("".to_string()))
814+
})
815+
}),
816+
Arc::new(move |table_id, instance_id, imms, _| {
817+
let (tx, rx) = oneshot::channel::<()>();
818+
let (finish_tx, finish_rx) = oneshot::channel::<()>();
819+
spawn_merging_task_tx.send((tx, finish_rx)).unwrap();
820+
spawn(async move {
821+
rx.await.unwrap();
822+
finish_tx.send(()).unwrap();
823+
let first_imm = &imms[0];
824+
Ok(SharedBufferBatch {
825+
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
826+
first_imm.epochs().clone(),
827+
first_imm.get_payload().iter().cloned().collect_vec(),
828+
first_imm.raw_smallest_key().clone(),
829+
first_imm.raw_largest_key().clone(),
830+
100,
831+
imms.iter().map(|imm| imm.batch_id()).collect_vec(),
832+
vec![],
833+
100,
834+
None,
835+
)),
836+
table_id,
837+
instance_id,
838+
})
839+
})
840+
}),
841+
);
842+
843+
let _join_handle = spawn(event_handler.start_hummock_event_handler_worker());
844+
845+
let (read_version_tx, read_version_rx) = oneshot::channel();
846+
847+
tx.send(HummockEvent::RegisterReadVersion {
848+
table_id,
849+
new_read_version_sender: read_version_tx,
850+
is_replicated: false,
851+
})
852+
.unwrap();
853+
let (read_version, guard) = read_version_rx.await.unwrap();
854+
let instance_id = guard.instance_id;
855+
856+
let build_batch = |epoch, spill_offset| {
857+
SharedBufferBatch::build_shared_buffer_batch(
858+
epoch,
859+
spill_offset,
860+
vec![(TableKey(Bytes::from("key")), HummockValue::Delete)],
861+
10,
862+
vec![],
863+
table_id,
864+
instance_id,
865+
None,
866+
)
867+
};
868+
869+
let epoch1 = epoch0 + 1;
870+
let imm1 = build_batch(epoch1, 0);
871+
read_version
872+
.write()
873+
.update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone())));
874+
tx.send(HummockEvent::ImmToUploader(imm1.clone())).unwrap();
875+
tx.send(HummockEvent::SealEpoch {
876+
epoch: epoch1,
877+
is_checkpoint: true,
878+
})
879+
.unwrap();
880+
let (sync_tx, mut sync_rx) = oneshot::channel();
881+
tx.send(HummockEvent::AwaitSyncEpoch {
882+
new_sync_epoch: epoch1,
883+
sync_result_sender: sync_tx,
884+
})
885+
.unwrap();
886+
887+
let upload_finish_tx = spawn_upload_task_rx.recv().await.unwrap();
888+
assert!(poll_fn(|cx| Poll::Ready(sync_rx.poll_unpin(cx)))
889+
.await
890+
.is_pending());
891+
892+
let epoch2 = epoch1 + 1;
893+
let mut imm_ids = Vec::new();
894+
for i in 0..10 {
895+
let imm = build_batch(epoch2, i);
896+
imm_ids.push(imm.batch_id());
897+
read_version
898+
.write()
899+
.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
900+
tx.send(HummockEvent::ImmToUploader(imm)).unwrap();
901+
}
902+
903+
for (staging_imm, imm_id) in read_version
904+
.read()
905+
.staging()
906+
.imm
907+
.iter()
908+
.zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id())))
909+
{
910+
assert_eq!(staging_imm.batch_id(), imm_id);
911+
}
912+
913+
// should start merging task
914+
tx.send(HummockEvent::SealEpoch {
915+
epoch: epoch2,
916+
is_checkpoint: false,
917+
})
918+
.unwrap();
919+
920+
println!("before wait spawn merging task");
921+
922+
let (merging_start_tx, merging_finish_rx) = spawn_merging_task_rx.recv().await.unwrap();
923+
merging_start_tx.send(()).unwrap();
924+
925+
println!("after wait spawn merging task");
926+
927+
// yield to possibly poll the merging task, though it shouldn't poll it because there is unfinished syncing task
928+
yield_now().await;
929+
930+
for (staging_imm, imm_id) in read_version
931+
.read()
932+
.staging()
933+
.imm
934+
.iter()
935+
.zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id())))
936+
{
937+
assert_eq!(staging_imm.batch_id(), imm_id);
938+
}
939+
940+
upload_finish_tx.send(()).unwrap();
941+
assert!(sync_rx.await.unwrap().is_err());
942+
943+
merging_finish_rx.await.unwrap();
944+
945+
// yield to poll the merging task, and then it should have finished.
946+
for _ in 0..10 {
947+
yield_now().await;
948+
}
949+
950+
assert_eq!(
951+
read_version
952+
.read()
953+
.staging()
954+
.imm
955+
.iter()
956+
.map(|imm| imm.batch_id())
957+
.collect_vec(),
958+
vec![*imm_ids.last().unwrap(), imm1.batch_id()]
959+
);
960+
}
961+
}

src/storage/src/hummock/event_handler/refiller.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::hummock::{
4444
SstableStoreRef, TableHolder,
4545
};
4646
use crate::monitor::StoreLocalStatistic;
47+
use crate::opts::StorageOpts;
4748

4849
pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
4950
LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
@@ -201,6 +202,22 @@ pub struct CacheRefillConfig {
201202
pub threshold: f64,
202203
}
203204

205+
impl CacheRefillConfig {
206+
pub fn from_storage_opts(options: &StorageOpts) -> Self {
207+
Self {
208+
timeout: Duration::from_millis(options.cache_refill_timeout_ms),
209+
data_refill_levels: options
210+
.cache_refill_data_refill_levels
211+
.iter()
212+
.copied()
213+
.collect(),
214+
concurrency: options.cache_refill_concurrency,
215+
unit: options.cache_refill_unit,
216+
threshold: options.cache_refill_threshold,
217+
}
218+
}
219+
}
220+
204221
struct Item {
205222
handle: JoinHandle<()>,
206223
event: CacheRefillerEvent,

0 commit comments

Comments
 (0)