Skip to content

Commit 7e2bad7

Browse files
committed
feat: encode and decode BlockedBloomFilter
1 parent 00b1d6e commit 7e2bad7

File tree

4 files changed

+126
-22
lines changed

4 files changed

+126
-22
lines changed

benches/bloom.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ fn blocked_filter_construction(c: &mut Criterion) {
6363
}
6464

6565
fn blocked_filter_contains(c: &mut Criterion) {
66-
use lsm_tree::segment::filter::blocked_bloom::Builder;
66+
use lsm_tree::segment::filter::{blocked_bloom::Builder, AMQFilter};
6767

6868
let keys = (0..100_000u128)
6969
.map(|x| x.to_be_bytes().to_vec())

src/segment/filter/blocked_bloom/mod.rs

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@
33
// (found in the LICENSE-* files in the repository)
44

55
mod builder;
6-
use super::{bit_array::BitArrayReader, CACHE_LINE_BYTES};
6+
use super::{bit_array::BitArrayReader, AMQFilter, CACHE_LINE_BYTES};
7+
use crate::{
8+
coding::{DecodeError, Encode, EncodeError},
9+
file::MAGIC_BYTES,
10+
};
711
pub use builder::Builder;
12+
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13+
use std::io::{Read, Write};
814

915
/// Two hashes that are used for double hashing
1016
pub type CompositeHash = (u64, u64);
@@ -20,29 +26,22 @@ pub struct BlockedBloomFilter {
2026
num_blocks: usize,
2127
}
2228

23-
// TODO: Implement Encode and Decode for BlockedBloomFilter
29+
impl AMQFilter for BlockedBloomFilter {
30+
fn bytes(&self) -> &[u8] {
31+
self.inner.bytes()
32+
}
2433

25-
impl BlockedBloomFilter {
2634
/// Size of bloom filter in bytes
2735
#[must_use]
28-
pub fn len(&self) -> usize {
36+
fn len(&self) -> usize {
2937
self.inner.bytes().len()
3038
}
3139

32-
fn from_raw(m: usize, k: usize, slice: crate::Slice) -> Self {
33-
let num_blocks = m.div_ceil(CACHE_LINE_BYTES);
34-
Self {
35-
inner: BitArrayReader::new(slice),
36-
k,
37-
num_blocks,
38-
}
39-
}
40-
4140
/// Returns `true` if the hash may be contained.
4241
///
4342
/// Will never have a false negative.
4443
#[must_use]
45-
pub fn contains_hash(&self, (mut h1, mut h2): CompositeHash) -> bool {
44+
fn contains_hash(&self, (mut h1, mut h2): CompositeHash) -> bool {
4645
let block_idx = h1 % (self.num_blocks as u64);
4746

4847
for i in 1..(self.k as u64) {
@@ -65,10 +64,56 @@ impl BlockedBloomFilter {
6564
///
6665
/// Will never have a false negative.
6766
#[must_use]
68-
pub fn contains(&self, key: &[u8]) -> bool {
67+
fn contains(&self, key: &[u8]) -> bool {
6968
self.contains_hash(Self::get_hash(key))
7069
}
7170

71+
fn filter_type(&self) -> super::FilterType {
72+
super::FilterType::BlockedBloom
73+
}
74+
}
75+
76+
impl Encode for BlockedBloomFilter {
77+
fn encode_into<W: Write>(&self, writer: &mut W) -> Result<(), EncodeError> {
78+
// Write header
79+
writer.write_all(&MAGIC_BYTES)?;
80+
81+
writer.write_u8(super::FilterType::BlockedBloom as u8)?;
82+
83+
// NOTE: Hash type (unused)
84+
writer.write_u8(0)?;
85+
86+
writer.write_u64::<LittleEndian>(self.num_blocks as u64)?;
87+
writer.write_u64::<LittleEndian>(self.k as u64)?;
88+
writer.write_all(self.inner.bytes())?;
89+
90+
Ok(())
91+
}
92+
}
93+
94+
impl BlockedBloomFilter {
95+
// To be used by AMQFilter after magic bytes and filter type have been read and parsed
96+
pub(super) fn decode_from<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
97+
// NOTE: Hash type (unused)
98+
let hash_type = reader.read_u8()?;
99+
assert_eq!(0, hash_type, "Invalid bloom hash type");
100+
101+
let num_blocks = reader.read_u64::<LittleEndian>()? as usize;
102+
let k = reader.read_u64::<LittleEndian>()? as usize;
103+
104+
let mut bytes = vec![0; num_blocks * CACHE_LINE_BYTES];
105+
reader.read_exact(&mut bytes)?;
106+
107+
Ok(Self::from_raw(num_blocks, k, bytes.into()))
108+
}
109+
110+
fn from_raw(num_blocks: usize, k: usize, slice: crate::Slice) -> Self {
111+
Self {
112+
inner: BitArrayReader::new(slice),
113+
k,
114+
num_blocks,
115+
}
116+
}
72117
/// Returns `true` if the bit at `idx` is `1`.
73118
fn has_bit(&self, block_idx: usize, idx_in_block: usize) -> bool {
74119
self.inner
@@ -84,6 +129,57 @@ impl BlockedBloomFilter {
84129
#[cfg(test)]
85130
mod tests {
86131
use super::*;
132+
use crate::segment::filter::{AMQFilterBuilder, FilterType};
133+
134+
use std::fs::File;
135+
use test_log::test;
136+
137+
#[test]
138+
fn blocked_bloom_serde_round_trip() -> crate::Result<()> {
139+
let dir = tempfile::tempdir()?;
140+
141+
let path = dir.path().join("bf");
142+
let mut file = File::create(&path)?;
143+
144+
let mut filter = Builder::with_fp_rate(10, 0.0001);
145+
146+
let keys = &[
147+
b"item0", b"item1", b"item2", b"item3", b"item4", b"item5", b"item6", b"item7",
148+
b"item8", b"item9",
149+
];
150+
151+
for key in keys {
152+
filter.set_with_hash(BlockedBloomFilter::get_hash(*key));
153+
}
154+
155+
let filter = filter.build();
156+
157+
for key in keys {
158+
assert!(filter.contains(&**key));
159+
}
160+
assert!(!filter.contains(b"asdasads"));
161+
assert!(!filter.contains(b"item10"));
162+
assert!(!filter.contains(b"cxycxycxy"));
163+
164+
filter.encode_into(&mut file)?;
165+
file.sync_all()?;
166+
drop(file);
167+
168+
let mut file = File::open(&path)?;
169+
let filter_copy = AMQFilterBuilder::decode_from(&mut file)?;
170+
171+
assert_eq!(filter.inner.bytes(), filter_copy.bytes());
172+
assert_eq!(FilterType::BlockedBloom, filter_copy.filter_type());
173+
174+
for key in keys {
175+
assert!(filter.contains(&**key));
176+
}
177+
assert!(!filter_copy.contains(b"asdasads"));
178+
assert!(!filter_copy.contains(b"item10"));
179+
assert!(!filter_copy.contains(b"cxycxycxy"));
180+
181+
Ok(())
182+
}
87183

88184
#[test]
89185
fn blocked_bloom_basic() {

src/segment/filter/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod blocked_bloom;
77
pub mod standard_bloom;
88

99
use crate::{coding::DecodeError, file::MAGIC_BYTES};
10+
use blocked_bloom::BlockedBloomFilter;
1011
use byteorder::ReadBytesExt;
1112
use std::io::Read;
1213

@@ -46,7 +47,8 @@ impl BloomConstructionPolicy {
4647
}
4748
}
4849

49-
enum FilterType {
50+
#[derive(PartialEq, Debug)]
51+
pub enum FilterType {
5052
StandardBloom = 0,
5153
BlockedBloom = 1,
5254
}
@@ -67,6 +69,7 @@ pub trait AMQFilter: Sync + Send {
6769
fn len(&self) -> usize;
6870
fn contains(&self, item: &[u8]) -> bool;
6971
fn contains_hash(&self, hash: (u64, u64)) -> bool;
72+
fn filter_type(&self) -> FilterType;
7073
}
7174

7275
pub struct AMQFilterBuilder {}
@@ -89,8 +92,9 @@ impl AMQFilterBuilder {
8992
FilterType::StandardBloom => StandardBloomFilter::decode_from(reader)
9093
.map(Self::wrap_filter)
9194
.map_err(|e| DecodeError::from(e)),
92-
// TODO: Implement
93-
FilterType::BlockedBloom => Err(DecodeError::InvalidHeader("BlockedBloom")),
95+
FilterType::BlockedBloom => BlockedBloomFilter::decode_from(reader)
96+
.map(Self::wrap_filter)
97+
.map_err(|e| DecodeError::from(e)),
9498
}
9599
}
96100

src/segment/filter/standard_bloom/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,18 @@ impl AMQFilter for StandardBloomFilter {
7272

7373
true
7474
}
75+
76+
fn filter_type(&self) -> super::FilterType {
77+
super::FilterType::StandardBloom
78+
}
7579
}
7680

7781
impl Encode for StandardBloomFilter {
7882
fn encode_into<W: Write>(&self, writer: &mut W) -> Result<(), EncodeError> {
7983
// Write header
8084
writer.write_all(&MAGIC_BYTES)?;
8185

82-
// NOTE: Filter type (unused)
83-
writer.write_u8(0)?;
86+
writer.write_u8(super::FilterType::StandardBloom as u8)?;
8487

8588
// NOTE: Hash type (unused)
8689
writer.write_u8(0)?;
@@ -131,7 +134,7 @@ impl StandardBloomFilter {
131134

132135
#[cfg(test)]
133136
mod tests {
134-
use crate::segment::filter::AMQFilterBuilder;
137+
use crate::segment::filter::{AMQFilterBuilder, FilterType};
135138

136139
use super::*;
137140
use std::fs::File;
@@ -172,6 +175,7 @@ mod tests {
172175
let filter_copy = AMQFilterBuilder::decode_from(&mut file)?;
173176

174177
assert_eq!(filter.inner.bytes(), filter_copy.bytes());
178+
assert_eq!(FilterType::StandardBloom, filter_copy.filter_type());
175179

176180
for key in keys {
177181
assert!(filter.contains(&**key));

0 commit comments

Comments
 (0)