Skip to content

Commit ccfd21a

Browse files
authored
Primary caching 8: implement latest-at data-time cache entry deduplication (#4712)
Introduces the notion of cache deduplication: given a query at time `4` and a query at time `8` that both returns data at time `2`, they must share a single cache entry. I.e. starting with this PR, scrubbing through the OPF example will not result if more cache memory being used. --- Part of the primary caching series of PR (index search, joins, deserialization): - #4592 - #4593 - #4659 - #4680 - #4681 - #4698 - #4711 - #4712 - #4721 - #4726 - #4773 - #4784 - #4785 - #4793 - #4800
1 parent 8dd1681 commit ccfd21a

File tree

4 files changed

+160
-58
lines changed

4 files changed

+160
-58
lines changed

crates/re_query_cache/src/cache.rs

+24-31
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,10 @@ impl CacheKey {
149149
/// of data.
150150
#[derive(Default)]
151151
pub struct CacheBucket {
152-
/// The timestamps and [`RowId`]s of all cached rows.
152+
/// The _data_ timestamps and [`RowId`]s of all cached rows.
153153
///
154154
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
155-
pub(crate) pov_times: VecDeque<(TimeInt, RowId)>,
155+
pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>,
156156

157157
/// The [`InstanceKey`]s of the point-of-view components.
158158
pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
@@ -170,8 +170,8 @@ pub struct CacheBucket {
170170
impl CacheBucket {
171171
/// Iterate over the timestamps of the point-of-view components.
172172
#[inline]
173-
pub fn iter_pov_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
174-
self.pov_times.iter()
173+
pub fn iter_pov_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
174+
self.pov_data_times.iter()
175175
}
176176

177177
/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
@@ -207,7 +207,7 @@ impl CacheBucket {
207207
/// How many timestamps' worth of data is stored in this bucket?
208208
#[inline]
209209
pub fn num_entries(&self) -> usize {
210-
self.pov_times.len()
210+
self.pov_data_times.len()
211211
}
212212

213213
#[inline]
@@ -236,15 +236,15 @@ macro_rules! impl_insert {
236236
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));
237237

238238
let Self {
239-
pov_times,
239+
pov_data_times,
240240
pov_instance_keys,
241241
components: _,
242242
} = self;
243243

244244
let pov_row_id = arch_view.primary_row_id();
245-
let index = pov_times.partition_point(|t| t < &(query_time, pov_row_id));
245+
let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id));
246246

247-
pov_times.insert(index, (query_time, pov_row_id));
247+
pov_data_times.insert(index, (query_time, pov_row_id));
248248
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
249249
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
250250
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
@@ -332,30 +332,23 @@ impl CacheBucket {
332332
// which is notoriously painful in Rust (i.e., macros).
333333
// For this reason we move as much of the code as possible into the already existing macros in `query.rs`.
334334

335-
/// Caches the results of `LatestAt` queries.
335+
/// Caches the results of `LatestAt` archetype queries (`ArchetypeView`).
336336
///
337-
/// The `TimeInt` in the index corresponds to the timestamp of the query, _not_ the timestamp of
338-
/// the resulting data!
339-
//
340-
// TODO(cmc): we need an extra indirection layer so that cached entries can be shared across
341-
// queries with different query timestamps but identical data timestamps.
342-
// This requires keeping track of all `RowId`s in `ArchetypeView`, not just the `RowId` of the
343-
// point-of-view component.
337+
/// There is one `LatestAtCache` for each unique [`CacheKey`].
338+
///
339+
/// All query steps are cached: index search, cluster key joins and deserialization.
344340
#[derive(Default)]
345-
pub struct LatestAtCache(BTreeMap<TimeInt, CacheBucket>);
346-
347-
impl std::ops::Deref for LatestAtCache {
348-
type Target = BTreeMap<TimeInt, CacheBucket>;
349-
350-
#[inline]
351-
fn deref(&self) -> &Self::Target {
352-
&self.0
353-
}
354-
}
341+
pub struct LatestAtCache {
342+
/// Organized by _query_ time.
343+
///
344+
/// If the data you're looking for isn't in here, try partially running the query (i.e. run the
345+
/// index search in order to find a data time, but don't actually deserialize and join the data)
346+
/// and check if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
347+
pub per_query_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,
355348

356-
impl std::ops::DerefMut for LatestAtCache {
357-
#[inline]
358-
fn deref_mut(&mut self) -> &mut Self::Target {
359-
&mut self.0
360-
}
349+
/// Organized by _data_ time.
350+
///
351+
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
352+
/// can result in a data time of `T`.
353+
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,
361354
}

crates/re_query_cache/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub use self::query::{
1010
query_archetype_pov1, query_archetype_with_history_pov1, MaybeCachedComponentData,
1111
};
1212

13-
pub(crate) use self::cache::LatestAtCache;
13+
pub(crate) use self::cache::{CacheBucket, LatestAtCache};
1414

1515
pub use re_query::{QueryError, Result}; // convenience
1616

crates/re_query_cache/src/query.rs

+68-26
Original file line numberDiff line numberDiff line change
@@ -106,35 +106,12 @@ macro_rules! impl_query_archetype {
106106
format!("cached={cached} arch={} pov={} comp={}", A::name(), $N, $M)
107107
);
108108

109-
let mut latest_at_callback = |query: &LatestAtQuery, cache: &mut crate::LatestAtCache| {
110-
re_tracing::profile_scope!("latest_at", format!("{query:?}"));
111-
112-
let bucket = cache.entry(query.at).or_default();
113-
// NOTE: Implicitly dropping the write guard here: the LatestAtCache is free once again!
114-
115-
if bucket.is_empty() {
116-
re_tracing::profile_scope!("fill");
117-
118-
let now = web_time::Instant::now();
119-
// TODO(cmc): cache deduplication.
120-
let arch_view = query_archetype::<A>(store, &query, entity_path)?;
121-
122-
bucket.[<insert_pov $N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;
123-
124-
let elapsed = now.elapsed();
125-
::re_log::trace!(
126-
store_id=%store.id(),
127-
%entity_path,
128-
archetype=%A::name(),
129-
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
130-
1f64 / elapsed.as_secs_f64()
131-
);
132-
}
133109

110+
let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
134111
re_tracing::profile_scope!("iter");
135112

136113
let it = itertools::izip!(
137-
bucket.iter_pov_times(),
114+
bucket.iter_pov_data_times(),
138115
bucket.iter_pov_instance_keys(),
139116
$(bucket.iter_component::<$pov>()
140117
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
@@ -156,6 +133,71 @@ macro_rules! impl_query_archetype {
156133
Ok(())
157134
};
158135

136+
let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
137+
re_tracing::profile_scope!("latest_at", format!("{query:?}"));
138+
139+
let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache;
140+
141+
let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
142+
std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => {
143+
// Fastest path: we have an entry for this exact query time, no need to look any
144+
// further.
145+
return iter_results(&query_time_bucket_at_query_time.get().read());
146+
}
147+
entry @ std::collections::btree_map::Entry::Vacant(_) => entry,
148+
};
149+
150+
let arch_view = query_archetype::<A>(store, &query, entity_path)?;
151+
// TODO(cmc): actual timeless caching support.
152+
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN);
153+
154+
// Fast path: we've run the query and realized that we already have the data for the resulting
155+
// _data_ time, so let's use that to avoid join & deserialization costs.
156+
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
157+
*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);
158+
159+
// We now know for a fact that a query at that data time would yield the same
160+
// results: copy the bucket accordingly so that the next cache hit for that query
161+
// time ends up taking the fastest path.
162+
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
163+
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);
164+
165+
return iter_results(&data_time_bucket_at_data_time.read());
166+
}
167+
168+
let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default();
169+
170+
// Slowest path: this is a complete cache miss.
171+
{
172+
re_tracing::profile_scope!("fill");
173+
174+
// Grabbing the current time is quite costly on web.
175+
#[cfg(not(target_arch = "wasm32"))]
176+
let now = web_time::Instant::now();
177+
178+
let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
179+
query_time_bucket_at_query_time.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;
180+
181+
#[cfg(not(target_arch = "wasm32"))]
182+
{
183+
let elapsed = now.elapsed();
184+
::re_log::trace!(
185+
store_id=%store.id(),
186+
%entity_path,
187+
archetype=%A::name(),
188+
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
189+
1f64 / elapsed.as_secs_f64()
190+
);
191+
}
192+
}
193+
194+
let data_time_bucket_at_data_time = per_data_time.entry(data_time);
195+
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);
196+
197+
iter_results(&query_time_bucket_at_query_time.read())
198+
};
199+
200+
159201
match &query {
160202
// TODO(cmc): cached range support
161203
AnyQuery::Range(query) => {
@@ -203,7 +245,7 @@ macro_rules! impl_query_archetype {
203245
store.id().clone(),
204246
entity_path.clone(),
205247
query,
206-
|cache| latest_at_callback(query, cache),
248+
|latest_at_cache| latest_at_callback(query, latest_at_cache),
207249
)
208250
},
209251
}

crates/re_query_cache/tests/latest_at.rs

+67
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,73 @@ fn invalidation() {
209209
query_and_compare(&store, &query, &ent_path.into());
210210
}
211211

212+
// Test the following scenario:
213+
// ```py
214+
// rr.set_time(0)
215+
// rr.log("points", rr.Points3D([1, 2, 3]))
216+
//
217+
// # Do first query here: LatestAt(+inf)
218+
// # Expected: points=[[1,2,3]] colors=[]
219+
//
220+
// rr.set_time(1)
221+
// rr.log_components("points", rr.components.Color(0xFF0000))
222+
//
223+
// # Do second query here: LatestAt(+inf)
224+
// # Expected: points=[[1,2,3]] colors=[0xFF0000]
225+
//
226+
// rr.set_time(2)
227+
// rr.log_components("points", rr.components.Color(0x0000FF))
228+
//
229+
// # Do third query here: LatestAt(+inf)
230+
// # Expected: points=[[1,2,3]] colors=[0x0000FF]
231+
// ```
232+
//
233+
// TODO(cmc): this needs proper invalidation to pass
234+
#[should_panic(expected = "assertion failed: `(left == right)`")]
235+
#[test]
236+
fn invalidation_of_future_optionals() {
237+
let mut store = DataStore::new(
238+
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
239+
InstanceKey::name(),
240+
Default::default(),
241+
);
242+
243+
let ent_path = "points";
244+
245+
let frame1 = [build_frame_nr(1.into())];
246+
let frame2 = [build_frame_nr(2.into())];
247+
let frame3 = [build_frame_nr(3.into())];
248+
249+
let query_time = [build_frame_nr(9999.into())];
250+
251+
let positions = vec![Position2D::new(1.0, 2.0), Position2D::new(3.0, 4.0)];
252+
let row = DataRow::from_cells1_sized(RowId::new(), ent_path, frame1, 2, positions).unwrap();
253+
store.insert_row(&row).unwrap();
254+
255+
let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
256+
query_and_compare(&store, &query, &ent_path.into());
257+
258+
let color_instances = vec![InstanceKey::SPLAT];
259+
let colors = vec![Color::from_rgb(255, 0, 0)];
260+
let row =
261+
DataRow::from_cells2_sized(RowId::new(), ent_path, frame2, 1, (color_instances, colors))
262+
.unwrap();
263+
store.insert_row(&row).unwrap();
264+
265+
let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
266+
query_and_compare(&store, &query, &ent_path.into());
267+
268+
let color_instances = vec![InstanceKey::SPLAT];
269+
let colors = vec![Color::from_rgb(0, 0, 255)];
270+
let row =
271+
DataRow::from_cells2_sized(RowId::new(), ent_path, frame3, 1, (color_instances, colors))
272+
.unwrap();
273+
store.insert_row(&row).unwrap();
274+
275+
let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
276+
query_and_compare(&store, &query, &ent_path.into());
277+
}
278+
212279
// ---
213280

214281
fn query_and_compare(store: &DataStore, query: &LatestAtQuery, ent_path: &EntityPath) {

0 commit comments

Comments
 (0)