Skip to content

Commit bad48d8

Browse files
0xdeafbeefzaidoon1
authored andcommitted
implement with_capacity for WriteBatch (#947)
* implement `with_capacity` for `WriteBatch` * add tests for WriteBatch::with_capacity * update changelog - remove `clippy.toml` because we already have `msrv` in `Cargo.toml`
1 parent 1e3538d commit bad48d8

File tree

3 files changed

+161
-140
lines changed

3 files changed

+161
-140
lines changed

clippy.toml

-1
This file was deleted.

src/write_batch.rs

+17-3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_
8989
}
9090

9191
impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
92+
/// Create a new `WriteBatch` without allocating memory.
93+
pub fn new() -> Self {
94+
Self {
95+
inner: unsafe { ffi::rocksdb_writebatch_create() },
96+
}
97+
}
98+
99+
/// Creates `WriteBatch` with the specified `capacity` in bytes. Allocates immediately.
100+
pub fn with_capacity_bytes(capacity_bytes: usize) -> Self {
101+
Self {
102+
// zeroes from default constructor
103+
// https://github.com/facebook/rocksdb/blob/0f35db55d86ea8699ea936c9e2a4e34c82458d6b/include/rocksdb/write_batch.h#L66
104+
inner: unsafe { ffi::rocksdb_writebatch_create_with_params(capacity_bytes, 0, 0, 0) },
105+
}
106+
}
107+
92108
/// Construct with a reference to a byte array serialized by [`WriteBatch`].
93109
pub fn from_data(data: &[u8]) -> Self {
94110
unsafe {
@@ -354,9 +370,7 @@ impl WriteBatchWithTransaction<false> {
354370

355371
impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
356372
fn default() -> Self {
357-
Self {
358-
inner: unsafe { ffi::rocksdb_writebatch_create() },
359-
}
373+
Self::new()
360374
}
361375
}
362376

tests/test_db.rs

+144-136
Original file line numberDiff line numberDiff line change
@@ -535,29 +535,31 @@ fn test_get_updates_since_multiple_batches() {
535535

536536
#[test]
537537
fn test_get_updates_since_one_batch() {
538-
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_one_batch");
539-
let db = DB::open_default(&path).unwrap();
540-
db.put(b"key2", b"value2").unwrap();
541-
// some puts and deletes in a single batch,
542-
// verify 1 put and 1 delete were done
543-
let seq1 = db.latest_sequence_number();
544-
assert_eq!(seq1, 1);
545-
let mut batch = WriteBatch::default();
546-
batch.put(b"key1", b"value1");
547-
batch.delete(b"key2");
548-
db.write(batch).unwrap();
549-
assert_eq!(db.latest_sequence_number(), 3);
550-
let mut iter = db.get_updates_since(seq1).unwrap();
551-
let mut counts = OperationCounts {
552-
puts: 0,
553-
deletes: 0,
554-
};
555-
let (seq, batch) = iter.next().unwrap().unwrap();
556-
assert_eq!(seq, 2);
557-
batch.iterate(&mut counts);
558-
assert!(iter.next().is_none());
559-
assert_eq!(counts.puts, 1);
560-
assert_eq!(counts.deletes, 1);
538+
let batches = [WriteBatch::default(), WriteBatch::with_capacity_bytes(13)];
539+
for mut batch in batches {
540+
let path = DBPath::new("_rust_rocksdb_test_get_updates_since_one_batch");
541+
let db = DB::open_default(&path).unwrap();
542+
db.put(b"key2", b"value2").unwrap();
543+
// some puts and deletes in a single batch,
544+
// verify 1 put and 1 delete were done
545+
let seq1 = db.latest_sequence_number();
546+
assert_eq!(seq1, 1);
547+
batch.put(b"key1", b"value1");
548+
batch.delete(b"key2");
549+
db.write(batch).unwrap();
550+
assert_eq!(db.latest_sequence_number(), 3);
551+
let mut iter = db.get_updates_since(seq1).unwrap();
552+
let mut counts = OperationCounts {
553+
puts: 0,
554+
deletes: 0,
555+
};
556+
let (seq, batch) = iter.next().unwrap().unwrap();
557+
assert_eq!(seq, 2);
558+
batch.iterate(&mut counts);
559+
assert!(iter.next().is_none());
560+
assert_eq!(counts.puts, 1);
561+
assert_eq!(counts.deletes, 1);
562+
}
561563
}
562564

563565
#[test]
@@ -953,134 +955,140 @@ fn prefix_extract_and_iterate_test() {
953955

954956
#[test]
955957
fn get_with_cache_and_bulkload_test() {
956-
let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_test");
957-
let log_path = DBPath::new("_rust_rocksdb_log_path_test");
958-
959-
// create options
960-
let mut opts = Options::default();
961-
opts.create_if_missing(true);
962-
opts.create_missing_column_families(true);
963-
opts.set_wal_bytes_per_sync(8 << 10); // 8KB
964-
opts.set_writable_file_max_buffer_size(512 << 10); // 512KB
965-
opts.set_enable_write_thread_adaptive_yield(true);
966-
opts.set_unordered_write(true);
967-
opts.set_max_subcompactions(2);
968-
opts.set_max_background_jobs(4);
969-
opts.set_use_adaptive_mutex(true);
970-
opts.set_db_log_dir(&log_path);
971-
opts.set_memtable_whole_key_filtering(true);
972-
opts.set_dump_malloc_stats(true);
973-
opts.set_level_compaction_dynamic_level_bytes(false);
974-
975-
// trigger all sst files in L1/2 instead of L0
976-
opts.set_max_bytes_for_level_base(64 << 10); // 64KB
977-
978-
{
979-
// set block based table and cache
980-
let cache = Cache::new_lru_cache(512 << 10);
981-
assert_eq!(cache.get_usage(), 0);
982-
let mut block_based_opts = BlockBasedOptions::default();
983-
block_based_opts.set_block_cache(&cache);
984-
block_based_opts.set_cache_index_and_filter_blocks(true);
985-
opts.set_block_based_table_factory(&block_based_opts);
986-
987-
// open db
988-
let db = DB::open(&opts, &path).unwrap();
989-
990-
// write a lot
991-
let mut batch = WriteBatch::default();
992-
for i in 0..10_000 {
993-
batch.put(format!("{i:0>4}").as_bytes(), b"v");
994-
}
995-
assert!(db.write(batch).is_ok());
958+
let batches = [
959+
WriteBatch::default(),
960+
WriteBatch::with_capacity_bytes(13),
961+
WriteBatch::with_capacity_bytes(100_000),
962+
];
963+
for mut batch in batches {
964+
let path = DBPath::new("_rust_rocksdb_get_with_cache_and_bulkload_test");
965+
let log_path = DBPath::new("_rust_rocksdb_log_path_test");
966+
967+
// create options
968+
let mut opts = Options::default();
969+
opts.create_if_missing(true);
970+
opts.create_missing_column_families(true);
971+
opts.set_wal_bytes_per_sync(8 << 10); // 8KB
972+
opts.set_writable_file_max_buffer_size(512 << 10); // 512KB
973+
opts.set_enable_write_thread_adaptive_yield(true);
974+
opts.set_unordered_write(true);
975+
opts.set_max_subcompactions(2);
976+
opts.set_max_background_jobs(4);
977+
opts.set_use_adaptive_mutex(true);
978+
opts.set_db_log_dir(&log_path);
979+
opts.set_memtable_whole_key_filtering(true);
980+
opts.set_dump_malloc_stats(true);
981+
opts.set_level_compaction_dynamic_level_bytes(false);
996982

997-
// flush memory table to sst and manual compaction
998-
assert!(db.flush().is_ok());
999-
db.compact_range(Some(format!("{:0>4}", 0).as_bytes()), None::<Vec<u8>>);
983+
// trigger all sst files in L1/2 instead of L0
984+
opts.set_max_bytes_for_level_base(64 << 10); // 64KB
1000985

1001-
// get -> trigger caching
1002-
let _ = db.get(b"1");
1003-
assert!(cache.get_usage() > 0);
986+
{
987+
// set block based table and cache
988+
let cache = Cache::new_lru_cache(512 << 10);
989+
assert_eq!(cache.get_usage(), 0);
990+
let mut block_based_opts = BlockBasedOptions::default();
991+
block_based_opts.set_block_cache(&cache);
992+
block_based_opts.set_cache_index_and_filter_blocks(true);
993+
opts.set_block_based_table_factory(&block_based_opts);
994+
995+
// open db
996+
let db = DB::open(&opts, &path).unwrap();
997+
998+
// write a lot
999+
for i in 0..10_000 {
1000+
batch.put(format!("{i:0>4}").as_bytes(), b"v");
1001+
}
1002+
assert!(db.write(batch).is_ok());
10041003

1005-
// get approximated memory usage
1006-
let mem_usage = get_memory_usage_stats(Some(&[&db]), None).unwrap();
1007-
assert!(mem_usage.mem_table_total > 0);
1004+
// flush memory table to sst and manual compaction
1005+
assert!(db.flush().is_ok());
1006+
db.compact_range(Some(format!("{:0>4}", 0).as_bytes()), None::<Vec<u8>>);
10081007

1009-
// get approximated cache usage
1010-
let mem_usage = get_memory_usage_stats(None, Some(&[&cache])).unwrap();
1011-
assert!(mem_usage.cache_total > 0);
1012-
}
1008+
// get -> trigger caching
1009+
let _ = db.get(b"1");
1010+
assert!(cache.get_usage() > 0);
10131011

1014-
// bulk loading
1015-
{
1016-
// open db
1017-
let db = DB::open(&opts, &path).unwrap();
1012+
// get approximated memory usage
1013+
let mem_usage = get_memory_usage_stats(Some(&[&db]), None).unwrap();
1014+
assert!(mem_usage.mem_table_total > 0);
10181015

1019-
// try to get key
1020-
let iter = db.iterator(IteratorMode::Start);
1021-
for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
1022-
assert_eq!(k.as_ref(), format!("{expected:0>4}").as_bytes());
1016+
// get approximated cache usage
1017+
let mem_usage = get_memory_usage_stats(None, Some(&[&cache])).unwrap();
1018+
assert!(mem_usage.cache_total > 0);
10231019
}
10241020

1025-
// check live files (sst files meta)
1026-
let livefiles = db.live_files().unwrap();
1027-
assert_eq!(livefiles.len(), 1);
1028-
livefiles.iter().for_each(|f| {
1029-
assert_eq!(f.level, 2);
1030-
assert_eq!(f.column_family_name, "default");
1031-
assert!(!f.name.is_empty());
1032-
assert_eq!(
1033-
f.start_key.as_ref().unwrap().as_slice(),
1034-
format!("{:0>4}", 0).as_bytes()
1035-
);
1036-
assert_eq!(
1037-
f.end_key.as_ref().unwrap().as_slice(),
1038-
format!("{:0>4}", 9999).as_bytes()
1039-
);
1040-
assert_eq!(f.num_entries, 10000);
1041-
assert_eq!(f.num_deletions, 0);
1042-
});
1021+
// bulk loading
1022+
{
1023+
// open db
1024+
let db = DB::open(&opts, &path).unwrap();
10431025

1044-
// delete sst file in range (except L0)
1045-
assert!(db
1046-
.delete_file_in_range(
1047-
format!("{:0>4}", 0).as_bytes(),
1048-
format!("{:0>4}", 9999).as_bytes()
1049-
)
1050-
.is_ok());
1051-
let livefiles = db.live_files().unwrap();
1052-
assert_eq!(livefiles.len(), 0);
1026+
// try to get key
1027+
let iter = db.iterator(IteratorMode::Start);
1028+
for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
1029+
assert_eq!(k.as_ref(), format!("{expected:0>4}").as_bytes());
1030+
}
10531031

1054-
// try to get a deleted key
1055-
assert!(db.get(format!("{:0>4}", 123).as_bytes()).unwrap().is_none());
1056-
}
1032+
// check live files (sst files meta)
1033+
let livefiles = db.live_files().unwrap();
1034+
assert_eq!(livefiles.len(), 1);
1035+
livefiles.iter().for_each(|f| {
1036+
assert_eq!(f.level, 2);
1037+
assert_eq!(f.column_family_name, "default");
1038+
assert!(!f.name.is_empty());
1039+
assert_eq!(
1040+
f.start_key.as_ref().unwrap().as_slice(),
1041+
format!("{:0>4}", 0).as_bytes()
1042+
);
1043+
assert_eq!(
1044+
f.end_key.as_ref().unwrap().as_slice(),
1045+
format!("{:0>4}", 9999).as_bytes()
1046+
);
1047+
assert_eq!(f.num_entries, 10000);
1048+
assert_eq!(f.num_deletions, 0);
1049+
});
1050+
1051+
// delete sst file in range (except L0)
1052+
assert!(db
1053+
.delete_file_in_range(
1054+
format!("{:0>4}", 0).as_bytes(),
1055+
format!("{:0>4}", 9999).as_bytes()
1056+
)
1057+
.is_ok());
1058+
let livefiles = db.live_files().unwrap();
1059+
assert_eq!(livefiles.len(), 0);
1060+
1061+
// try to get a deleted key
1062+
assert!(db.get(format!("{:0>4}", 123).as_bytes()).unwrap().is_none());
1063+
}
10571064

1058-
// raise error when db exists
1059-
{
1060-
opts.set_error_if_exists(true);
1061-
assert!(DB::open(&opts, &path).is_err());
1062-
}
1065+
// raise error when db exists
1066+
{
1067+
opts.set_error_if_exists(true);
1068+
assert!(DB::open(&opts, &path).is_err());
1069+
}
10631070

1064-
// disable all threads
1065-
{
1066-
// create new options
1067-
let mut opts = Options::default();
1068-
opts.set_max_background_jobs(0);
1069-
opts.set_stats_dump_period_sec(0);
1070-
opts.set_stats_persist_period_sec(0);
1071+
// disable all threads
1072+
{
1073+
// create new options
1074+
let mut opts = Options::default();
1075+
opts.set_max_background_jobs(0);
1076+
opts.set_stats_dump_period_sec(0);
1077+
opts.set_stats_persist_period_sec(0);
10711078

1072-
// test Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
1073-
let mut env = Env::new().unwrap();
1074-
env.set_bottom_priority_background_threads(0);
1075-
opts.set_env(&env);
1079+
// test Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
1080+
let mut env = Env::new().unwrap();
1081+
env.set_bottom_priority_background_threads(0);
1082+
opts.set_env(&env);
10761083

1077-
// open db
1078-
let db = DB::open(&opts, &path).unwrap();
1084+
// open db
1085+
let db = DB::open(&opts, &path).unwrap();
10791086

1080-
// try to get key
1081-
let iter = db.iterator(IteratorMode::Start);
1082-
for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
1083-
assert_eq!(k.as_ref(), format!("{expected:0>4}").as_bytes());
1087+
// try to get key
1088+
let iter = db.iterator(IteratorMode::Start);
1089+
for (expected, (k, _)) in iter.map(Result::unwrap).enumerate() {
1090+
assert_eq!(k.as_ref(), format!("{expected:0>4}").as_bytes());
1091+
}
10841092
}
10851093
}
10861094
}

0 commit comments

Comments
 (0)