Skip to content

fix(storage): replace imms with merged imms in any position of staging #14756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 269 additions & 24 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use crate::hummock::compactor::{compact, CompactorContext};
use crate::hummock::conflict_detector::ConflictDetector;
use crate::hummock::event_handler::refiller::CacheRefillerEvent;
use crate::hummock::event_handler::uploader::{
HummockUploader, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent,
default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData,
UploadTaskInfo, UploadTaskPayload, UploaderEvent,
};
use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate};
use crate::hummock::local_version::pinned_version::PinnedVersion;
Expand All @@ -43,7 +44,7 @@ use crate::hummock::store::version::{
};
use crate::hummock::utils::validate_table_key_range;
use crate::hummock::{
HummockError, HummockResult, MemoryLimiter, SstableObjectIdManager, TrackerId,
HummockError, HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef, TrackerId,
};
use crate::monitor::HummockStateStoreMetrics;
use crate::opts::StorageOpts;
Expand Down Expand Up @@ -126,7 +127,7 @@ pub struct HummockEventHandler {

last_instance_id: LocalInstanceId,

sstable_object_id_manager: Arc<SstableObjectIdManager>,
sstable_object_id_manager: Option<Arc<SstableObjectIdManager>>,
}

async fn flush_imms(
Expand Down Expand Up @@ -164,24 +165,17 @@ impl HummockEventHandler {
filter_key_extractor_manager: FilterKeyExtractorManager,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
state_store_metrics: Arc<HummockStateStoreMetrics>,
cache_refill_config: CacheRefillConfig,
) -> Self {
let (version_update_notifier_tx, _) =
tokio::sync::watch::channel(pinned_version.max_committed_epoch());
let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
let buffer_tracker = BufferTracker::from_storage_opts(
&compactor_context.storage_opts,
state_store_metrics.uploader_uploading_task_size.clone(),
);
let write_conflict_detector =
ConflictDetector::new_from_config(&compactor_context.storage_opts);
let sstable_store = compactor_context.sstable_store.clone();
let upload_compactor_context = compactor_context.clone();
let cloned_sstable_object_id_manager = sstable_object_id_manager.clone();
let uploader = HummockUploader::new(
Self::new_inner(
hummock_event_tx,
hummock_event_rx,
pinned_version,
Some(sstable_object_id_manager),
compactor_context.sstable_store.clone(),
state_store_metrics,
pinned_version.clone(),
&compactor_context.storage_opts,
Arc::new(move |payload, task_info| {
spawn(flush_imms(
payload,
Expand All @@ -191,11 +185,43 @@ impl HummockEventHandler {
cloned_sstable_object_id_manager.clone(),
))
}),
default_spawn_merging_task(compactor_context.compaction_executor.clone()),
)
}

fn new_inner(
hummock_event_tx: mpsc::UnboundedSender<HummockEvent>,
hummock_event_rx: mpsc::UnboundedReceiver<HummockEvent>,
pinned_version: PinnedVersion,
sstable_object_id_manager: Option<Arc<SstableObjectIdManager>>,
sstable_store: SstableStoreRef,
state_store_metrics: Arc<HummockStateStoreMetrics>,
storage_opts: &StorageOpts,
spawn_upload_task: SpawnUploadTask,
spawn_merging_task: SpawnMergingTask,
) -> Self {
let (version_update_notifier_tx, _) =
tokio::sync::watch::channel(pinned_version.max_committed_epoch());
let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
let buffer_tracker = BufferTracker::from_storage_opts(
storage_opts,
state_store_metrics.uploader_uploading_task_size.clone(),
);
let write_conflict_detector = ConflictDetector::new_from_config(storage_opts);

let uploader = HummockUploader::new(
state_store_metrics,
pinned_version.clone(),
spawn_upload_task,
spawn_merging_task,
buffer_tracker,
&compactor_context.storage_opts,
compactor_context.compaction_executor.clone(),
storage_opts,
);
let refiller = CacheRefiller::new(
CacheRefillConfig::from_storage_opts(storage_opts),
sstable_store,
);
let refiller = CacheRefiller::new(cache_refill_config, sstable_store);

Self {
hummock_event_tx,
Expand Down Expand Up @@ -393,8 +419,10 @@ impl HummockEventHandler {
});
}

self.sstable_object_id_manager
.remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX));
if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
sstable_object_id_manager
.remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX));
}

// Notify completion of the Clear event.
let _ = notifier.send(()).inspect_err(|e| {
Expand Down Expand Up @@ -466,10 +494,12 @@ impl HummockEventHandler {
if let Some(conflict_detector) = self.write_conflict_detector.as_ref() {
conflict_detector.set_watermark(max_committed_epoch);
}
self.sstable_object_id_manager
.remove_watermark_object_id(TrackerId::Epoch(

if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager {
sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch(
self.pinned_version.load().max_committed_epoch(),
));
}

debug!(
"update to hummock version: {}, epoch: {}",
Expand Down Expand Up @@ -714,3 +744,218 @@ fn to_sync_result(result: &HummockResult<SyncedData>) -> HummockResult<SyncResul
))),
}
}

#[cfg(test)]
mod tests {
use std::future::poll_fn;
use std::iter::once;
use std::sync::Arc;
use std::task::Poll;

use bytes::Bytes;
use futures::FutureExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_pb::hummock::PbHummockVersion;
use tokio::spawn;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot;
use tokio::task::yield_now;

use crate::hummock::event_handler::{HummockEvent, HummockEventHandler};
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner,
};
use crate::hummock::store::version::{StagingData, VersionUpdate};
use crate::hummock::test_utils::default_opts_for_test;
use crate::hummock::value::HummockValue;
use crate::hummock::HummockError;
use crate::monitor::HummockStateStoreMetrics;

#[tokio::test]
async fn test_event_handler() {
let (tx, rx) = unbounded_channel();
let table_id = TableId::new(123);
let epoch0 = 233;
let pinned_version = PinnedVersion::new(
HummockVersion::from_rpc_protobuf(&PbHummockVersion {
id: 1,
max_committed_epoch: epoch0,
..Default::default()
}),
unbounded_channel().0,
);

let mut storage_opts = default_opts_for_test();
storage_opts.imm_merge_threshold = 5;

let (spawn_upload_task_tx, mut spawn_upload_task_rx) = unbounded_channel();
let (spawn_merging_task_tx, mut spawn_merging_task_rx) = unbounded_channel();
let event_handler = HummockEventHandler::new_inner(
tx.clone(),
rx,
pinned_version,
None,
mock_sstable_store(),
Arc::new(HummockStateStoreMetrics::unused()),
&storage_opts,
Arc::new(move |_, _| {
let (tx, rx) = oneshot::channel::<()>();
spawn_upload_task_tx.send(tx).unwrap();
spawn(async move {
// wait for main thread to notify returning error
rx.await.unwrap();
Err(HummockError::other("".to_string()))
})
}),
Arc::new(move |table_id, instance_id, imms, _| {
let (tx, rx) = oneshot::channel::<()>();
let (finish_tx, finish_rx) = oneshot::channel::<()>();
spawn_merging_task_tx.send((tx, finish_rx)).unwrap();
spawn(async move {
rx.await.unwrap();
finish_tx.send(()).unwrap();
let first_imm = &imms[0];
Ok(SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
first_imm.epochs().clone(),
first_imm.get_payload().iter().cloned().collect_vec(),
first_imm.raw_smallest_key().clone(),
first_imm.raw_largest_key().clone(),
100,
imms.iter().map(|imm| imm.batch_id()).collect_vec(),
vec![],
100,
None,
)),
table_id,
instance_id,
})
})
}),
);

let _join_handle = spawn(event_handler.start_hummock_event_handler_worker());

let (read_version_tx, read_version_rx) = oneshot::channel();

tx.send(HummockEvent::RegisterReadVersion {
table_id,
new_read_version_sender: read_version_tx,
is_replicated: false,
})
.unwrap();
let (read_version, guard) = read_version_rx.await.unwrap();
let instance_id = guard.instance_id;

let build_batch = |epoch, spill_offset| {
SharedBufferBatch::build_shared_buffer_batch(
epoch,
spill_offset,
vec![(TableKey(Bytes::from("key")), HummockValue::Delete)],
10,
vec![],
table_id,
Some(instance_id),
None,
)
};

let epoch1 = epoch0 + 1;
let imm1 = build_batch(epoch1, 0);
read_version
.write()
.update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone())));
tx.send(HummockEvent::ImmToUploader(imm1.clone())).unwrap();
tx.send(HummockEvent::SealEpoch {
epoch: epoch1,
is_checkpoint: true,
})
.unwrap();
let (sync_tx, mut sync_rx) = oneshot::channel();
tx.send(HummockEvent::AwaitSyncEpoch {
new_sync_epoch: epoch1,
sync_result_sender: sync_tx,
})
.unwrap();

let upload_finish_tx = spawn_upload_task_rx.recv().await.unwrap();
assert!(poll_fn(|cx| Poll::Ready(sync_rx.poll_unpin(cx)))
.await
.is_pending());

let epoch2 = epoch1 + 1;
let mut imm_ids = Vec::new();
for i in 0..10 {
let imm = build_batch(epoch2, i);
imm_ids.push(imm.batch_id());
read_version
.write()
.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
tx.send(HummockEvent::ImmToUploader(imm)).unwrap();
}

for (staging_imm, imm_id) in read_version
.read()
.staging()
.imm
.iter()
.zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id())))
{
assert_eq!(staging_imm.batch_id(), imm_id);
}

// should start merging task
tx.send(HummockEvent::SealEpoch {
epoch: epoch2,
is_checkpoint: false,
})
.unwrap();

println!("before wait spawn merging task");

let (merging_start_tx, merging_finish_rx) = spawn_merging_task_rx.recv().await.unwrap();
merging_start_tx.send(()).unwrap();

println!("after wait spawn merging task");

// yield to possibly poll the merging task, though it shouldn't poll it because there is unfinished syncing task
yield_now().await;

for (staging_imm, imm_id) in read_version
.read()
.staging()
.imm
.iter()
.zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id())))
{
assert_eq!(staging_imm.batch_id(), imm_id);
}

upload_finish_tx.send(()).unwrap();
assert!(sync_rx.await.unwrap().is_err());

merging_finish_rx.await.unwrap();

// yield to poll the merging task, and then it should have finished.
for _ in 0..10 {
yield_now().await;
}

assert_eq!(
read_version
.read()
.staging()
.imm
.iter()
.map(|imm| imm.batch_id())
.collect_vec(),
vec![*imm_ids.last().unwrap(), imm1.batch_id()]
);
}
}
17 changes: 17 additions & 0 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::hummock::{
SstableStoreRef, TableHolder,
};
use crate::monitor::StoreLocalStatistic;
use crate::opts::StorageOpts;

pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
Expand Down Expand Up @@ -201,6 +202,22 @@ pub struct CacheRefillConfig {
pub threshold: f64,
}

impl CacheRefillConfig {
pub fn from_storage_opts(options: &StorageOpts) -> Self {
Self {
timeout: Duration::from_millis(options.cache_refill_timeout_ms),
data_refill_levels: options
.cache_refill_data_refill_levels
.iter()
.copied()
.collect(),
concurrency: options.cache_refill_concurrency,
unit: options.cache_refill_unit,
threshold: options.cache_refill_threshold,
}
}
}

struct Item {
handle: JoinHandle<()>,
event: CacheRefillerEvent,
Expand Down
Loading