Skip to content

fix(obserivation): fix missed hitmap updates, fix drop order #14788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 25, 2024
8 changes: 4 additions & 4 deletions src/storage/src/hummock/sstable/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
pub const DEFAULT_RESTART_INTERVAL: usize = 16;
pub const DEFAULT_ENTRY_SIZE: usize = 24; // table_id(u64) + primary_key(u64) + epoch(u64)

pub const HITMAP_ELEMS: usize = 4;

#[allow(non_camel_case_types)]
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum LenType {
Expand Down Expand Up @@ -154,7 +152,7 @@ pub struct Block {
/// Restart points.
restart_points: Vec<RestartPoint>,

hitmap: Hitmap<HITMAP_ELEMS>,
hitmap: Hitmap<{ Self::HITMAP_ELEMS }>,
}

impl Clone for Block {
Expand All @@ -180,6 +178,8 @@ impl Debug for Block {
}

impl Block {
pub const HITMAP_ELEMS: usize = 4;

pub fn get_algorithm(buf: &Bytes) -> HummockResult<CompressionAlgorithm> {
let compression = CompressionAlgorithm::decode(&mut &buf[buf.len() - 9..buf.len() - 8])?;
Ok(compression)
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Block {
&self.data[..]
}

pub fn hitmap(&self) -> &Hitmap<HITMAP_ELEMS> {
pub fn hitmap(&self) -> &Hitmap<{ Self::HITMAP_ELEMS }> {
&self.hitmap
}

Expand Down
53 changes: 36 additions & 17 deletions src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::BytesMut;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;

use super::{KeyPrefix, LenType, RestartPoint, HITMAP_ELEMS};
use super::{Block, KeyPrefix, LenType, RestartPoint};
use crate::hummock::BlockHolder;
use crate::monitor::LocalHitmap;

Expand All @@ -41,13 +41,22 @@ pub struct BlockIterator {
last_key_len_type: LenType,
last_value_len_type: LenType,

/// NOTE: `hitmap` is supposed be updated every time when `value_range` is updated.
hitmap: LocalHitmap<HITMAP_ELEMS>,
/// NOTE:
///
/// - `hitmap` is supposed to be updated each time accessing the block data in a new position.
/// - `hitmap` must be reported to the block hitmap before drop.
hitmap: LocalHitmap<{ Block::HITMAP_ELEMS }>,
}

impl Drop for BlockIterator {
fn drop(&mut self) {
self.block.hitmap().report(&mut self.hitmap);
}
}

impl BlockIterator {
pub fn new(block: BlockHolder) -> Self {
let hitmap = block.hitmap().local();
let hitmap = LocalHitmap::default();
Self {
block,
offset: usize::MAX,
Expand Down Expand Up @@ -175,7 +184,8 @@ impl BlockIterator {
self.offset = offset;
self.entry_len = prefix.entry_len();

self.update_hitmap();
self.hitmap
.fill_with_range(self.offset, self.value_range.end, self.block.len());

true
}
Expand Down Expand Up @@ -246,27 +256,41 @@ impl BlockIterator {
}

/// Searches the restart point index that the given `key` belongs to.
fn search_restart_point_index_by_key(&self, key: FullKey<&[u8]>) -> usize {
fn search_restart_point_index_by_key(&mut self, key: FullKey<&[u8]>) -> usize {
// Find the largest restart point that restart key equals or less than the given key.
self.block
let res = self
.block
.search_restart_partition_point(
|&RestartPoint {
offset: probe,
key_len_type,
value_len_type,
}| {
let prefix =
self.decode_prefix_at(probe as usize, key_len_type, value_len_type);
let probe = probe as usize;
let prefix = KeyPrefix::decode(
&mut &self.block.data()[probe..],
probe,
key_len_type,
value_len_type,
);
let probe_key = &self.block.data()[prefix.diff_key_range()];
let full_probe_key =
FullKey::from_slice_without_table_id(self.block.table_id(), probe_key);
self.hitmap.fill_with_range(
probe,
prefix.diff_key_range().end,
self.block.len(),
);
match full_probe_key.cmp(&key) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
},
)
.saturating_sub(1) // Prevent from underflowing when given is smaller than the first.
// Prevent from underflowing when given is smaller than the first.
.saturating_sub(1);

res
}

/// Seeks to the restart point that the given `key` belongs to.
Expand All @@ -291,7 +315,8 @@ impl BlockIterator {
self.entry_len = prefix.entry_len();
self.update_restart_point(index);

self.update_hitmap();
self.hitmap
.fill_with_range(self.offset, self.value_range.end, self.block.len());
}

fn update_restart_point(&mut self, index: usize) {
Expand All @@ -301,12 +326,6 @@ impl BlockIterator {
self.last_key_len_type = restart_point.key_len_type;
self.last_value_len_type = restart_point.value_len_type;
}

/// Update the local hitmap of the block based on the current iterator position.
fn update_hitmap(&mut self) {
self.hitmap
.fill_with_range(self.offset, self.value_range.end, self.block.len());
}
}

#[cfg(test)]
Expand Down
41 changes: 29 additions & 12 deletions src/storage/src/monitor/hitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,11 @@ impl<const N: usize> Hitmap<N> {
N * 8
}

pub fn local(&self) -> LocalHitmap<N> {
LocalHitmap::new(self)
}

pub fn apply(&self, local: &LocalHitmap<N>) {
pub fn report(&self, local: &mut LocalHitmap<N>) {
for (global, local) in self.data.iter().zip_eq_fast(local.data.iter()) {
global.fetch_or(*local, Ordering::Relaxed);
}
local.reset();
}

pub fn ones(&self) -> usize {
Expand Down Expand Up @@ -80,10 +77,15 @@ impl<const N: usize> Hitmap<N> {

#[derive(Debug)]
pub struct LocalHitmap<const N: usize> {
owner: Hitmap<N>,
data: Box<[u64; N]>,
}

impl<const N: usize> Default for LocalHitmap<N> {
fn default() -> Self {
Self::new()
}
}

impl<const N: usize> LocalHitmap<N> {
pub const fn bits() -> usize {
N * u64::BITS as usize
Expand All @@ -93,17 +95,29 @@ impl<const N: usize> LocalHitmap<N> {
N * 8
}

pub fn new(owner: &Hitmap<N>) -> Self {
pub fn new() -> Self {
Self {
owner: owner.clone(),
data: Box::new([0; N]),
}
}

pub fn reset(&mut self) {
for elem in &mut *self.data {
*elem = 0;
}
}

pub fn merge(&mut self, other: &mut Self) {
for (elem, e) in self.data.iter_mut().zip_eq_fast(other.data.iter()) {
*elem |= *e;
}
other.reset();
}

pub fn fill(&mut self, start_bit: usize, end_bit: usize) {
const MASK: usize = (1 << 6) - 1;

let end_bit = std::cmp::min(end_bit, Self::bits());
let end_bit = end_bit.clamp(start_bit + 1, Self::bits());

let head_bits = start_bit & MASK;
let tail_bits_rev = end_bit & MASK;
Expand Down Expand Up @@ -153,9 +167,12 @@ impl<const N: usize> LocalHitmap<N> {
}
}

#[cfg(debug_assertions)]
impl<const N: usize> Drop for LocalHitmap<N> {
fn drop(&mut self) {
self.owner.apply(self);
if self.ones() > 0 {
panic!("LocalHitmap is not reported!");
}
}
}

Expand All @@ -168,7 +185,7 @@ mod tests {
// hex: high <== low
let g = Hitmap::<4>::default();

let mut h = g.local();
let mut h = LocalHitmap::new();
assert_eq!(
h.to_hex_vec(),
vec![
Expand Down Expand Up @@ -223,7 +240,7 @@ mod tests {
]
);
assert_eq!(h.ones(), 256);
drop(h);
g.report(&mut h);
assert_eq!(
g.to_hex_vec(),
vec![
Expand Down