Skip to content

Implement support for fully asynchronous QueryHandles #7964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,34 @@ impl ChunkStoreHandle {
impl ChunkStoreHandle {
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, ChunkStore> {
self.0.read()
self.0.read_recursive()
}

#[inline]
pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, ChunkStore>> {
self.0.try_read_recursive()
}

#[inline]
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, ChunkStore> {
self.0.write()
}

#[inline]
pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, ChunkStore>> {
self.0.try_write()
}

#[inline]
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::read_arc(&self.0)
parking_lot::RwLock::read_arc_recursive(&self.0)
}

#[inline]
pub fn try_read_arc(
&self,
) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, ChunkStore>> {
parking_lot::RwLock::try_read_recursive_arc(&self.0)
}

#[inline]
Expand All @@ -360,6 +377,13 @@ impl ChunkStoreHandle {
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore> {
parking_lot::RwLock::write_arc(&self.0)
}

#[inline]
pub fn try_write_arc(
&self,
) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, ChunkStore>> {
parking_lot::RwLock::try_write_arc(&self.0)
}
}

/// A complete chunk store: covers all timelines, all entities, everything.
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
/// * second, the component columns in lexical order (`Color`, `Radius, ...`).
#[inline]
pub fn schema(&self) -> Vec<ColumnDescriptor> {
self.engine.with_store(|store| store.schema())
self.engine.with(|store, _cache| store.schema())
}

/// Returns the filtered schema for the given [`QueryExpression`].
Expand All @@ -92,7 +92,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
#[inline]
pub fn schema_for_query(&self, query: &QueryExpression) -> Vec<ColumnDescriptor> {
self.engine
.with_store(|store| store.schema_for_query(query))
.with(|store, _cache| store.schema_for_query(query))
}

/// Starts a new query by instantiating a [`QueryHandle`].
Expand All @@ -107,7 +107,7 @@ impl<E: StorageEngineLike + Clone> QueryEngine<E> {
&self,
filter: &'a EntityPathFilter,
) -> impl Iterator<Item = EntityPath> + 'a {
self.engine.with_store(|store| {
self.engine.with(|store, _cache| {
store
.all_entities()
.into_iter()
Expand Down
89 changes: 74 additions & 15 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {
///
/// It is important that query handles stay cheap to create.
fn init(&self) -> &QueryHandleState {
self.state.get_or_init(|| {
self.engine
.with_store(|store| self.engine.with_cache(|cache| self.init_(store, cache)))
})
self.engine
.with(|store, cache| self.state.get_or_init(|| self.init_(store, cache)))
}

// NOTE: This is split in its own method otherwise it completely breaks `rustfmt`.
Expand Down Expand Up @@ -216,15 +214,16 @@ impl<E: StorageEngineLike> QueryHandle<E> {
.keep_extra_timelines(true) // we want all the timelines we can get!
.keep_extra_components(false)
};
let (view_pov_chunks_idx, mut view_chunks) = self.fetch_view_chunks(&query, &view_contents);
let (view_pov_chunks_idx, mut view_chunks) =
self.fetch_view_chunks(store, cache, &query, &view_contents);

// 5. Collect all relevant clear chunks and update the view accordingly.
//
// We'll turn the clears into actual empty arrays of the expected component type.
{
re_tracing::profile_scope!("clear_chunks");

let clear_chunks = self.fetch_clear_chunks(&query, &view_contents);
let clear_chunks = self.fetch_clear_chunks(store, cache, &query, &view_contents);
for (view_idx, chunks) in view_chunks.iter_mut().enumerate() {
let Some(ColumnDescriptor::Component(descr)) = view_contents.get(view_idx) else {
continue;
Expand Down Expand Up @@ -443,6 +442,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {

fn fetch_view_chunks(
&self,
store: &ChunkStore,
cache: &QueryCache,
query: &RangeQuery,
view_contents: &[ColumnDescriptor],
) -> (Option<usize>, Vec<Vec<(AtomicU64, Chunk)>>) {
Expand All @@ -456,7 +457,13 @@ impl<E: StorageEngineLike> QueryHandle<E> {

ColumnDescriptor::Component(column) => {
let chunks = self
.fetch_chunks(query, &column.entity_path, [column.component_name])
.fetch_chunks(
store,
cache,
query,
&column.entity_path,
[column.component_name],
)
.unwrap_or_default();

if let Some(pov) = self.query.filtered_is_not_null.as_ref() {
Expand All @@ -481,6 +488,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// The component data is stripped out, only the indices are left.
fn fetch_clear_chunks(
&self,
store: &ChunkStore,
cache: &QueryCache,
query: &RangeQuery,
view_contents: &[ColumnDescriptor],
) -> IntMap<EntityPath, Vec<Chunk>> {
Expand Down Expand Up @@ -544,7 +553,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
// For the entity itself, any chunk that contains clear data is relevant, recursive or not.
// Just fetch everything we find.
let flat_chunks = self
.fetch_chunks(query, entity_path, component_names)
.fetch_chunks(store, cache, query, entity_path, component_names)
.map(|chunks| {
chunks
.into_iter()
Expand All @@ -555,7 +564,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {

let recursive_chunks =
entity_path_ancestors(entity_path).flat_map(|ancestor_path| {
self.fetch_chunks(query, &ancestor_path, component_names)
self.fetch_chunks(store, cache, query, &ancestor_path, component_names)
.into_iter() // option
.flat_map(|chunks| chunks.into_iter().map(|(_cursor, chunk)| chunk))
// NOTE: Ancestors' chunks are only relevant for the rows where `ClearIsRecursive=true`.
Expand All @@ -577,6 +586,8 @@ impl<E: StorageEngineLike> QueryHandle<E> {

fn fetch_chunks<const N: usize>(
&self,
_store: &ChunkStore,
cache: &QueryCache,
query: &RangeQuery,
entity_path: &EntityPath,
component_names: [ComponentName; N],
Expand All @@ -587,9 +598,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
//
// TODO(cmc): Going through the cache is very useful in a Viewer context, but
// not so much in an SDK context. Make it configurable.
let results = self
.engine
.with_cache(|cache| cache.range(query, entity_path, component_names));
let results = cache.range(query, entity_path, component_names);

debug_assert!(
results.components.len() <= 1,
Expand Down Expand Up @@ -783,10 +792,43 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// ```
#[inline]
pub fn next_row(&self) -> Option<Vec<Box<dyn ArrowArray>>> {
self.engine.with_cache(|cache| self._next_row(cache))
self.engine
.with(|store, cache| self._next_row(store, cache))
}

/// Asynchronously returns the next row's worth of data.
///
/// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
/// the index, for each respective `ColumnDescriptor`.
/// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
///
/// Example:
/// ```ignore
/// while let Some(row) = query_handle.next_row_async().await {
/// // …
/// }
/// ```
pub fn next_row_async(
&self,
) -> impl std::future::Future<Output = Option<Vec<Box<dyn ArrowArray>>>> {
let res: Option<Option<_>> = self
.engine
.try_with(|store, cache| self._next_row(store, cache));

std::future::poll_fn(move |_cx| match &res {
Some(row) => std::task::Poll::Ready(row.clone()),
None => std::task::Poll::Pending,
})
}

pub fn _next_row(&self, cache: &QueryCache) -> Option<Vec<Box<dyn ArrowArray>>> {
pub fn _next_row(
&self,
store: &ChunkStore,
cache: &QueryCache,
) -> Option<Vec<Box<dyn ArrowArray>>> {
re_tracing::profile_function!();

/// Temporary state used to resolve the streaming join for the current iteration.
Expand Down Expand Up @@ -816,7 +858,10 @@ impl<E: StorageEngineLike> QueryHandle<E> {
Retrofilled(UnitChunkShared),
}

let state = self.init();
// Although that's a synchronous lock, we probably don't need to worry about it until
// there is proof to the contrary: we are in a specific `QueryHandle` after all, there's
// really no good reason to be contending here in the first place.
let state = self.state.get_or_init(move || self.init_(store, cache));

let row_idx = state.cur_row.fetch_add(1, Ordering::Relaxed);
let cur_index_value = state.unique_index_values.get(row_idx as usize)?;
Expand Down Expand Up @@ -1160,6 +1205,20 @@ impl<E: StorageEngineLike> QueryHandle<E> {
data: ArrowChunk::new(self.next_row()?),
})
}

#[inline]
pub async fn next_row_batch_async(&self) -> Option<RecordBatch> {
let row = self.next_row_async().await?;

// If we managed to get a row, then the state must be initialized already.
#[allow(clippy::unwrap_used)]
let schema = self.state.get().unwrap().arrow_schema.clone();

Some(RecordBatch {
schema,
data: ArrowChunk::new(row),
})
}
}

impl<E: StorageEngineLike> QueryHandle<E> {
Expand Down
28 changes: 26 additions & 2 deletions crates/store/re_query/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,34 @@ impl QueryCacheHandle {
impl QueryCacheHandle {
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, QueryCache> {
self.0.read()
self.0.read_recursive()
}

#[inline]
pub fn try_read(&self) -> Option<parking_lot::RwLockReadGuard<'_, QueryCache>> {
self.0.try_read_recursive()
}

#[inline]
pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, QueryCache> {
self.0.write()
}

#[inline]
pub fn try_write(&self) -> Option<parking_lot::RwLockWriteGuard<'_, QueryCache>> {
self.0.try_write()
}

#[inline]
pub fn read_arc(&self) -> parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache> {
parking_lot::RwLock::read_arc(&self.0)
parking_lot::RwLock::read_arc_recursive(&self.0)
}

#[inline]
pub fn try_read_arc(
&self,
) -> Option<parking_lot::ArcRwLockReadGuard<parking_lot::RawRwLock, QueryCache>> {
parking_lot::RwLock::try_read_recursive_arc(&self.0)
}

#[inline]
Expand All @@ -112,6 +129,13 @@ impl QueryCacheHandle {
) -> parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache> {
parking_lot::RwLock::write_arc(&self.0)
}

#[inline]
pub fn try_write_arc(
&self,
) -> Option<parking_lot::ArcRwLockWriteGuard<parking_lot::RawRwLock, QueryCache>> {
parking_lot::RwLock::try_write_arc(&self.0)
}
}

pub struct QueryCache {
Expand Down
Loading
Loading