Skip to content

Implement safe storage handles #7934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5703,7 +5703,6 @@ dependencies = [
"re_build_info",
"re_chunk",
"re_chunk_store",
"re_dataframe",
"re_format",
"re_int_histogram",
"re_log",
Expand Down
5 changes: 5 additions & 0 deletions bacon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ command = [
need_stdout = false
watch = ["tests", "benches", "examples"]

[jobs.libs]
command = ["cargo", "clippy", "--lib", "--all-features", "--color=always"]
need_stdout = false
watch = ["tests", "benches", "examples"]

[jobs.wasm]
command = [
"cargo",
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ indent.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
parking_lot = { workspace = true, features = ["arc_lock"] }
thiserror.workspace = true
web-time.workspace = true

Expand Down
4 changes: 3 additions & 1 deletion crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ pub use self::dataframe::{
pub use self::events::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::stats::{ChunkStoreChunkStats, ChunkStoreStats};
pub use self::store::{ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ColumnMetadata};
pub use self::store::{
ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata,
};
pub use self::subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle};

pub(crate) use self::store::ColumnMetadataState;
Expand Down
91 changes: 88 additions & 3 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,58 @@ pub struct ChunkStoreGeneration {
gc_id: u64,
}

/// A ref-counted, inner-mutable handle to a [`ChunkStore`].
///
/// Cheap to clone.
///
/// It is possible to grab the lock behind this handle while _maintaining a static lifetime_, see:
/// * [`ChunkStoreHandle::read_arc`]
/// * [`ChunkStoreHandle::write_arc`]
#[derive(Clone)]
pub struct ChunkStoreHandle(Arc<parking_lot::RwLock<ChunkStore>>);

impl std::fmt::Display for ChunkStoreHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{}", self.0.read()))
}
}

impl ChunkStoreHandle {
#[inline]
pub fn new(store: ChunkStore) -> Self {
Self(Arc::new(parking_lot::RwLock::new(store)))
}

#[inline]
pub fn into_inner(self) -> Arc<parking_lot::RwLock<ChunkStore>> {
self.0
}
}

impl ChunkStoreHandle {
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
self.0.read()
}

#[inline]
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
self.0.write()
}

#[inline]
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::read_arc(&self.0)
}

#[inline]
pub fn write_arc(
&self,
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::write_arc(&self.0)
}
}

/// A complete chunk store: covers all timelines, all entities, everything.
///
/// The chunk store _always_ works at the chunk level, whether it is for write & read queries or
Expand Down Expand Up @@ -473,6 +525,7 @@ impl ChunkStore {
/// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
///
/// See also:
/// * [`ChunkStore::new`]
/// * [`ChunkStore::from_rrd_filepath`]
#[inline]
pub fn new(id: StoreId, config: ChunkStoreConfig) -> Self {
Expand All @@ -496,9 +549,20 @@ impl ChunkStore {
}
}

/// Instantiate a new empty `ChunkStore` with the given [`ChunkStoreConfig`].
///
/// Pre-wraps the result in a [`ChunkStoreHandle`].
///
/// See also:
/// * [`ChunkStore::from_rrd_filepath`]
#[inline]
pub fn id(&self) -> &StoreId {
&self.id
pub fn new_handle(id: StoreId, config: ChunkStoreConfig) -> ChunkStoreHandle {
ChunkStoreHandle::new(Self::new(id, config))
}

#[inline]
pub fn id(&self) -> StoreId {
self.id.clone()
}

#[inline]
Expand Down Expand Up @@ -591,7 +655,7 @@ impl ChunkStore {
impl ChunkStore {
/// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
///
/// The store will be prefilled using the data at the specified path.
/// The stores will be prefilled with the data at the specified path.
///
/// See also:
/// * [`ChunkStore::new`]
Expand Down Expand Up @@ -650,4 +714,25 @@ impl ChunkStore {

Ok(stores)
}

/// Instantiate a new `ChunkStore` with the given [`ChunkStoreConfig`].
///
/// Wraps the results in [`ChunkStoreHandle`]s.
///
/// The stores will be prefilled with the data at the specified path.
///
/// See also:
/// * [`ChunkStore::new_handle`]
pub fn handle_from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: crate::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, ChunkStoreHandle>> {
Ok(
Self::from_rrd_filepath(store_config, path_to_rrd, version_policy)?
.into_iter()
.map(|(store_id, store)| (store_id, ChunkStoreHandle::new(store)))
.collect(),
)
}
}
22 changes: 7 additions & 15 deletions crates/store/re_dataframe/examples/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

use itertools::Itertools;

use re_chunk::TimeInt;
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, QueryExpression, SparseFillStrategy, Timeline, VersionPolicy,
use re_dataframe::{
ChunkStoreConfig, EntityPathFilter, QueryEngine, QueryExpression, ResolvedTimeRange,
SparseFillStrategy, StoreKind, TimeInt, Timeline, VersionPolicy,
};
use re_dataframe::{QueryCache, QueryEngine};
use re_log_types::{EntityPathFilter, ResolvedTimeRange, StoreKind};

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
Expand Down Expand Up @@ -42,27 +40,21 @@ fn main() -> anyhow::Result<()> {
_ => Timeline::new_temporal(timeline_name),
};

let stores = ChunkStore::from_rrd_filepath(
let engines = QueryEngine::from_rrd_filepath(
&ChunkStoreConfig::DEFAULT,
path_to_rrd,
VersionPolicy::Warn,
)?;

for (store_id, store) in &stores {
for (store_id, engine) in &engines {
if store_id.kind != StoreKind::Recording {
continue;
}

let query_cache = QueryCache::new(store);
let query_engine = QueryEngine {
store,
cache: &query_cache,
};

let query = QueryExpression {
filtered_index: Some(timeline),
view_contents: Some(
query_engine
engine
.iter_entity_paths(&entity_path_filter)
.map(|entity_path| (entity_path, None))
.collect(),
Expand All @@ -73,7 +65,7 @@ fn main() -> anyhow::Result<()> {
};
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
let query_handle = engine.query(query.clone());
// eprintln!("{:#?}", query_handle.selected_contents());
for batch in query_handle.into_batch_iter() {
eprintln!("{batch}");
Expand Down
79 changes: 59 additions & 20 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::collections::BTreeMap;

use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{ChunkStore, ColumnDescriptor, QueryExpression};
use re_log_types::EntityPathFilter;
use re_query::QueryCache;
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ColumnDescriptor, QueryExpression,
VersionPolicy,
};
use re_log_types::{EntityPathFilter, StoreId};
use re_query::{QueryCache, QueryCacheHandle, StorageEngine, StorageEngineLike};

use crate::QueryHandle;

Expand All @@ -15,26 +20,57 @@ use re_chunk_store::ComponentColumnDescriptor;
// `TransportChunk` type until we migrate to `arrow-rs`.
// `TransportChunk` maps 1:1 to `RecordBatch` so the switch (and the compatibility layer in the meantime)
// will be trivial.
// TODO(cmc): add an `arrow` feature to transportchunk in a follow-up pr and call it a day.
pub type RecordBatch = TransportChunk;

// --- Queries ---

/// A handle to our user-facing query engine.
///
/// Cheap to clone.
///
/// See the following methods:
/// * [`QueryEngine::schema`]: get the complete schema of the recording.
/// * [`QueryEngine::query`]: execute a [`QueryExpression`] on the recording.
//
// TODO(cmc): This needs to be a refcounted type that can be easily be passed around: the ref has
// got to go. But for that we need to generally introduce `ChunkStoreHandle` and `QueryCacheHandle`
// first, and this is not as straightforward as it seems.
pub struct QueryEngine<'a> {
pub store: &'a ChunkStore,
pub cache: &'a QueryCache,
#[derive(Clone)]
pub struct QueryEngine<E: StorageEngineLike> {
pub engine: E,
}

impl QueryEngine<StorageEngine> {
#[inline]
pub fn new(store: ChunkStoreHandle, cache: QueryCacheHandle) -> Self {
// Safety: EntityDb's handles can never be accessed from the outside, therefore these
// handles had to have been constructed in an external context, outside of the main app.
#[allow(unsafe_code)]
let engine = unsafe { StorageEngine::new(store, cache) };

Self { engine }
}

/// This will automatically instantiate a new empty [`QueryCache`].
#[inline]
pub fn from_store(store: ChunkStoreHandle) -> Self {
Self::new(store.clone(), QueryCache::new_handle(store))
}

/// Like [`ChunkStore::from_rrd_filepath`], but automatically instantiates [`QueryEngine`]s
/// with new empty [`QueryCache`]s.
#[inline]
pub fn from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, Self>> {
Ok(
ChunkStore::handle_from_rrd_filepath(store_config, path_to_rrd, version_policy)?
.into_iter()
.map(|(store_id, store)| (store_id, Self::from_store(store)))
.collect(),
)
}
}

impl QueryEngine<'_> {
impl<E: StorageEngineLike + Clone> QueryEngine<E> {
/// Returns the full schema of the store.
///
/// This will include a column descriptor for every timeline and every component on every
Expand All @@ -45,7 +81,7 @@ impl QueryEngine<'_> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
self.store.schema()
self.engine.with_store(|store| store.schema())
}

/// Returns the filtered schema for the given [`QueryExpression`].
Expand All @@ -55,13 +91,14 @@ impl QueryEngine<'_> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
self.store.schema_for_query(query)
self.engine
.with_store(|store| store.schema_for_query(query))
}

/// Starts a new query by instantiating a [`QueryHandle`].
#[inline]
pub fn query(&self, query: QueryExpression) -> QueryHandle<'_> {
QueryHandle::new(self, query)
pub fn query(&self, query: QueryExpression) -> QueryHandle<E> {
QueryHandle::new(self.engine.clone(), query)
}

/// Returns an iterator over all the [`EntityPath`]s present in the database.
Expand All @@ -70,9 +107,11 @@ impl QueryEngine<'_> {
&self,
filter: &'a EntityPathFilter,
) -> impl Iterator<Item = EntityPath> + 'a {
self.store
.all_entities()
.into_iter()
.filter(|entity_path| filter.matches(entity_path))
self.engine.with_store(|store| {
store
.all_entities()
.into_iter()
.filter(|entity_path| filter.matches(entity_path))
})
}
}
9 changes: 5 additions & 4 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ pub use self::external::arrow2::chunk::Chunk as ArrowChunk;
pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk};
#[doc(no_inline)]
pub use self::external::re_chunk_store::{
ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression,
SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange,
IndexValue, QueryExpression, SparseFillStrategy, TimeColumnSelector, VersionPolicy,
ViewContentsSelector,
};
#[doc(no_inline)]
pub use self::external::re_log_types::{
EntityPath, EntityPathFilter, ResolvedTimeRange, TimeInt, Timeline,
EntityPath, EntityPathFilter, ResolvedTimeRange, StoreKind, TimeInt, Timeline,
};
#[doc(no_inline)]
pub use self::external::re_query::QueryCache;
pub use self::external::re_query::{QueryCache, QueryCacheHandle, StorageEngine};

pub mod external {
pub use re_chunk;
Expand Down
Loading
Loading