Skip to content

Commit 962b7de

Browse files
authored
Implement safe storage handles (#7934)
This implements `ChunkStoreHandle` & `QueryCacheHandle` which, among other things, allow for streaming dataframes across FFI and network barriers. ```rust /// A ref-counted, inner-mutable handle to a [`QueryCache`]. /// /// Cheap to clone. /// /// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see: /// * [`QueryCacheHandle::read_arc`] /// * [`QueryCacheHandle::write_arc`] #[derive(Clone)] pub struct QueryCacheHandle(Arc<parking_lot::RwLock<QueryCache>>); /// A ref-counted, inner-mutable handle to a [`ChunkStore`]. /// /// Cheap to clone. /// /// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see: /// * [`ChunkStoreHandle::read_arc`] /// * [`ChunkStoreHandle::write_arc`] #[derive(Clone)] pub struct ChunkStoreHandle(Arc<parking_lot::RwLock<ChunkStore>>); ``` Those handles on their own are extremely problematic though: letting them loose all across the codebase effectively wraps the entire codebase in a semantically-unsafe{} block where every row interacting with these handles can lead to very nasty race conditions and even deadlocks. That's why this PR also introduces the `StorageEngine` type, which makes using these handles actually safe in practice: ```rust /// Keeps track of handles towards a [`ChunkStore`] and its [`QueryCache`]. /// /// A [`StorageEngine`] doesn't add any feature on top of what [`ChunkStoreHandle`] and /// [`QueryCacheHandle`] already offer: the job of the [`StorageEngine`] is to leverage the type /// system in order to protect against deadlocks and race conditions at compile time. /// /// The handles stored within will never be publicly accessible past construction. /// /// The underlying [`ChunkStore`] and [`QueryCache`] can be accessed through one of the /// following methods: /// * [`StorageEngine::read`] /// * [`StorageEngine::read_arc`] /// * [`StorageEngine::write`] /// * [`StorageEngine::write_arc`] #[derive(Clone)] pub struct StorageEngine { store: ChunkStoreHandle, cache: QueryCacheHandle, } ``` Balancing these safety guarantees with the flexibility we need (today... and tomorrow!) for all our corner use cases has proven to be a subtle art... * Fixes #7486
1 parent 2971d34 commit 962b7de

File tree

70 files changed

+1233
-691
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1233
-691
lines changed

Cargo.lock

-1
Original file line numberDiff line numberDiff line change
@@ -5703,7 +5703,6 @@ dependencies = [
57035703
"re_build_info",
57045704
"re_chunk",
57055705
"re_chunk_store",
5706-
"re_dataframe",
57075706
"re_format",
57085707
"re_int_histogram",
57095708
"re_log",

bacon.toml

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ command = [
1616
need_stdout = false
1717
watch = ["tests", "benches", "examples"]
1818

19+
[jobs.libs]
20+
command = ["cargo", "clippy", "--lib", "--all-features", "--color=always"]
21+
need_stdout = false
22+
watch = ["tests", "benches", "examples"]
23+
1924
[jobs.wasm]
2025
command = [
2126
"cargo",

crates/store/re_chunk_store/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ indent.workspace = true
4545
itertools.workspace = true
4646
nohash-hasher.workspace = true
4747
once_cell.workspace = true
48-
parking_lot.workspace = true
48+
parking_lot = { workspace = true, features = ["arc_lock"] }
4949
thiserror.workspace = true
5050
web-time.workspace = true
5151

crates/store/re_chunk_store/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ pub use self::dataframe::{
3232
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
3333
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
3434
pub use self::stats::{ChunkStoreChunkStats, ChunkStoreStats};
35-
pub use self::store::{ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ColumnMetadata};
35+
pub use self::store::{
36+
ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata,
37+
};
3638
pub use self::subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle};
3739

3840
pub(crate) use self::store::ColumnMetadataState;

crates/store/re_chunk_store/src/store.rs

+88-3
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,58 @@ pub struct ChunkStoreGeneration {
310310
gc_id: u64,
311311
}
312312

313+
/// A ref-counted, inner-mutable handle to a [`ChunkStore`].
314+
///
315+
/// Cheap to clone.
316+
///
317+
/// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see:
318+
/// * [`ChunkStoreHandle::read_arc`]
319+
/// * [`ChunkStoreHandle::write_arc`]
320+
#[derive(Clone)]
321+
pub struct ChunkStoreHandle(Arc<parking_lot::RwLock<ChunkStore>>);
322+
323+
impl std::fmt::Display for ChunkStoreHandle {
324+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325+
f.write_fmt(format_args!("{}", self.0.read()))
326+
}
327+
}
328+
329+
impl ChunkStoreHandle {
330+
#[inline]
331+
pub fn new(store: ChunkStore) -> Self {
332+
Self(Arc::new(parking_lot::RwLock::new(store)))
333+
}
334+
335+
#[inline]
336+
pub fn into_inner(self) -> Arc<parking_lot::RwLock<ChunkStore>> {
337+
self.0
338+
}
339+
}
340+
341+
impl ChunkStoreHandle {
342+
#[inline]
343+
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
344+
self.0.read()
345+
}
346+
347+
#[inline]
348+
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
349+
self.0.write()
350+
}
351+
352+
#[inline]
353+
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
354+
parking_lot::RwLock::read_arc(&self.0)
355+
}
356+
357+
#[inline]
358+
pub fn write_arc(
359+
&self,
360+
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
361+
parking_lot::RwLock::write_arc(&self.0)
362+
}
363+
}
364+
313365
/// A complete chunk store: covers all timelines, all entities, everything.
314366
///
315367
/// The chunk store _always_ works at the chunk level, whether it is for write & read queries or
@@ -473,6 +525,7 @@ impl ChunkStore {
473525
/// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
474526
///
475527
/// See also:
528+
/// * [`ChunkStore::new`]
476529
/// * [`ChunkStore::from_rrd_filepath`]
477530
#[inline]
478531
pub fn new(id: StoreId, config: ChunkStoreConfig) -> Self {
@@ -496,9 +549,20 @@ impl ChunkStore {
496549
}
497550
}
498551

552+
/// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
553+
///
554+
/// Pre-wraps the result in a [`ChunkStoreHandle`].
555+
///
556+
/// See also:
557+
/// * [`ChunkStore::from_rrd_filepath`]
499558
#[inline]
500-
pub fn id(&self) -> &StoreId {
501-
&self.id
559+
pub fn new_handle(id: StoreId, config: ChunkStoreConfig) -> ChunkStoreHandle {
560+
ChunkStoreHandle::new(Self::new(id, config))
561+
}
562+
563+
#[inline]
564+
pub fn id(&self) -> StoreId {
565+
self.id.clone()
502566
}
503567

504568
#[inline]
@@ -591,7 +655,7 @@ impl ChunkStore {
591655
impl ChunkStore {
592656
/// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
593657
///
594-
/// The store will be prefilled using the data at the specified path.
658+
/// The stores will be prefilled with the data at the specified path.
595659
///
596660
/// See also:
597661
/// * [`ChunkStore::new`]
@@ -650,4 +714,25 @@ impl ChunkStore {
650714

651715
Ok(stores)
652716
}
717+
718+
/// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
719+
///
720+
/// Wraps the results in [`ChunkStoreHandle`]s.
721+
///
722+
/// The stores will be prefilled with the data at the specified path.
723+
///
724+
/// See also:
725+
/// * [`ChunkStore::new_handle`]
726+
pub fn handle_from_rrd_filepath(
727+
store_config: &ChunkStoreConfig,
728+
path_to_rrd: impl AsRef<std::path::Path>,
729+
version_policy: crate::VersionPolicy,
730+
) -> anyhow::Result<BTreeMap<StoreId, ChunkStoreHandle>> {
731+
Ok(
732+
Self::from_rrd_filepath(store_config, path_to_rrd, version_policy)?
733+
.into_iter()
734+
.map(|(store_id, store)| (store_id, ChunkStoreHandle::new(store)))
735+
.collect(),
736+
)
737+
}
653738
}

crates/store/re_dataframe/examples/query.rs

+7-15
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
use itertools::Itertools;
44

5-
use re_chunk::TimeInt;
6-
use re_chunk_store::{
7-
ChunkStore, ChunkStoreConfig, QueryExpression, SparseFillStrategy, Timeline, VersionPolicy,
5+
use re_dataframe::{
6+
ChunkStoreConfig, EntityPathFilter, QueryEngine, QueryExpression, ResolvedTimeRange,
7+
SparseFillStrategy, StoreKind, TimeInt, Timeline, VersionPolicy,
88
};
9-
use re_dataframe::{QueryCache, QueryEngine};
10-
use re_log_types::{EntityPathFilter, ResolvedTimeRange, StoreKind};
119

1210
fn main() -> anyhow::Result<()> {
1311
let args = std::env::args().collect_vec();
@@ -42,27 +40,21 @@ fn main() -> anyhow::Result<()> {
4240
_ => Timeline::new_temporal(timeline_name),
4341
};
4442

45-
let stores = ChunkStore::from_rrd_filepath(
43+
let engines = QueryEngine::from_rrd_filepath(
4644
&ChunkStoreConfig::DEFAULT,
4745
path_to_rrd,
4846
VersionPolicy::Warn,
4947
)?;
5048

51-
for (store_id, store) in &stores {
49+
for (store_id, engine) in &engines {
5250
if store_id.kind != StoreKind::Recording {
5351
continue;
5452
}
5553

56-
let query_cache = QueryCache::new(store);
57-
let query_engine = QueryEngine {
58-
store,
59-
cache: &query_cache,
60-
};
61-
6254
let query = QueryExpression {
6355
filtered_index: Some(timeline),
6456
view_contents: Some(
65-
query_engine
57+
engine
6658
.iter_entity_paths(&entity_path_filter)
6759
.map(|entity_path| (entity_path, None))
6860
.collect(),
@@ -73,7 +65,7 @@ fn main() -> anyhow::Result<()> {
7365
};
7466
eprintln!("{query:#?}:");
7567

76-
let query_handle = query_engine.query(query.clone());
68+
let query_handle = engine.query(query.clone());
7769
// eprintln!("{:#?}", query_handle.selected_contents());
7870
for batch in query_handle.into_batch_iter() {
7971
eprintln!("{batch}");
+59-20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
use std::collections::BTreeMap;
2+
13
use re_chunk::{EntityPath, TransportChunk};
2-
use re_chunk_store::{ChunkStore, ColumnDescriptor, QueryExpression};
3-
use re_log_types::EntityPathFilter;
4-
use re_query::QueryCache;
4+
use re_chunk_store::{
5+
ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ColumnDescriptor, QueryExpression,
6+
VersionPolicy,
7+
};
8+
use re_log_types::{EntityPathFilter, StoreId};
9+
use re_query::{QueryCache, QueryCacheHandle, StorageEngine, StorageEngineLike};
510

611
use crate::QueryHandle;
712

@@ -15,26 +20,57 @@ use re_chunk_store::ComponentColumnDescriptor;
1520
// `TransportChunk` type until we migrate to `arrow-rs`.
1621
// `TransportChunk` maps 1:1 to `RecordBatch` so the switch (and the compatibility layer in the meantime)
1722
// will be trivial.
18-
// TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day.
1923
pub type RecordBatch = TransportChunk;
2024

2125
// --- Queries ---
2226

2327
/// A handle to our user-facing query engine.
2428
///
29+
/// Cheap to clone.
30+
///
2531
/// See the following methods:
2632
/// * [`QueryEngine::schema`]: get the complete schema of the recording.
2733
/// * [`QueryEngine::query`]: execute a [`QueryExpression`] on the recording.
28-
//
29-
// TODO(cmc): This needs to be a refcounted type that can be easily be passed around: the ref has
30-
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
31-
// first, and this is not as straightforward as it seems.
32-
pub struct QueryEngine<'a> {
33-
pub store: &'a ChunkStore,
34-
pub cache: &'a QueryCache,
34+
#[derive(Clone)]
35+
pub struct QueryEngine<E: StorageEngineLike> {
36+
pub engine: E,
37+
}
38+
39+
impl QueryEngine<StorageEngine> {
40+
#[inline]
41+
pub fn new(store: ChunkStoreHandle, cache: QueryCacheHandle) -> Self {
42+
// Safety: EntityDb's handles can never be accessed from the outside, therefore these
43+
// handles had to have been constructed in an external context, outside of the main app.
44+
#[allow(unsafe_code)]
45+
let engine = unsafe { StorageEngine::new(store, cache) };
46+
47+
Self { engine }
48+
}
49+
50+
/// This will automatically instantiate a new empty [`QueryCache`].
51+
#[inline]
52+
pub fn from_store(store: ChunkStoreHandle) -> Self {
53+
Self::new(store.clone(), QueryCache::new_handle(store))
54+
}
55+
56+
/// Like [`ChunkStore::from_rrd_filepath`], but automatically instantiates [`QueryEngine`]s
57+
/// with new empty [`QueryCache`]s.
58+
#[inline]
59+
pub fn from_rrd_filepath(
60+
store_config: &ChunkStoreConfig,
61+
path_to_rrd: impl AsRef<std::path::Path>,
62+
version_policy: VersionPolicy,
63+
) -> anyhow::Result<BTreeMap<StoreId, Self>> {
64+
Ok(
65+
ChunkStore::handle_from_rrd_filepath(store_config, path_to_rrd, version_policy)?
66+
.into_iter()
67+
.map(|(store_id, store)| (store_id, Self::from_store(store)))
68+
.collect(),
69+
)
70+
}
3571
}
3672

37-
impl QueryEngine<'_> {
73+
impl<E: StorageEngineLike + Clone> QueryEngine<E> {
3874
/// Returns the full schema of the store.
3975
///
4076
/// This will include a column descriptor for every timeline and every component on every
@@ -45,7 +81,7 @@ impl QueryEngine<'_> {
4581
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
4682
#[inline]
4783
pub fn schema(&self) -> Vec<ColumnDescriptor> {
48-
self.store.schema()
84+
self.engine.with_store(|store| store.schema())
4985
}
5086

5187
/// Returns the filtered schema for the given [`QueryExpression`].
@@ -55,13 +91,14 @@ impl QueryEngine<'_> {
5591
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
5692
#[inline]
5793
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
58-
self.store.schema_for_query(query)
94+
self.engine
95+
.with_store(|store| store.schema_for_query(query))
5996
}
6097

6198
/// Starts a new query by instantiating a [`QueryHandle`].
6299
#[inline]
63-
pub fn query(&self, query: QueryExpression) -> QueryHandle<'_> {
64-
QueryHandle::new(self, query)
100+
pub fn query(&self, query: QueryExpression) -> QueryHandle<E> {
101+
QueryHandle::new(self.engine.clone(), query)
65102
}
66103

67104
/// Returns an iterator over all the [`EntityPath`]s present in the database.
@@ -70,9 +107,11 @@ impl QueryEngine<'_> {
70107
&self,
71108
filter: &'a EntityPathFilter,
72109
) -> impl Iterator<Item = EntityPath> + 'a {
73-
self.store
74-
.all_entities()
75-
.into_iter()
76-
.filter(|entity_path| filter.matches(entity_path))
110+
self.engine.with_store(|store| {
111+
store
112+
.all_entities()
113+
.into_iter()
114+
.filter(|entity_path| filter.matches(entity_path))
115+
})
77116
}
78117
}

crates/store/re_dataframe/src/lib.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@ pub use self::external::arrow2::chunk::Chunk as ArrowChunk;
1212
pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk};
1313
#[doc(no_inline)]
1414
pub use self::external::re_chunk_store::{
15-
ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression,
16-
SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
15+
ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange,
16+
IndexValue, QueryExpression, SparseFillStrategy, TimeColumnSelector, VersionPolicy,
17+
ViewContentsSelector,
1718
};
1819
#[doc(no_inline)]
1920
pub use self::external::re_log_types::{
20-
EntityPath, EntityPathFilter, ResolvedTimeRange, TimeInt, Timeline,
21+
EntityPath, EntityPathFilter, ResolvedTimeRange, StoreKind, TimeInt, Timeline,
2122
};
2223
#[doc(no_inline)]
23-
pub use self::external::re_query::QueryCache;
24+
pub use self::external::re_query::{QueryCache, QueryCacheHandle, StorageEngine};
2425

2526
pub mod external {
2627
pub use re_chunk;

0 commit comments

Comments
 (0)