Skip to content

Commit 36446b6

Browse files
authored
Primary caching 18: range invalidation (ENABLED BY DEFAULT 🎊) (#4853)
Implement range invalidation and do a quality pass over all the size tracking stuff in the cache. **Range caching is now enabled by default!** - Fixes #4809 - Fixes #374 --- Part of the primary caching series of PR (index search, joins, deserialization): - #4592 - #4593 - #4659 - #4680 - #4681 - #4698 - #4711 - #4712 - #4721 - #4726 - #4773 - #4784 - #4785 - #4793 - #4800 - #4851 - #4852 - #4853 - #4856
1 parent 5800d1a commit 36446b6

File tree

8 files changed

+505
-65
lines changed

8 files changed

+505
-65
lines changed

crates/re_query_cache/src/cache.rs

+121-44
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,16 @@ impl Caches {
129129
let mut caches = caches.0.write();
130130

131131
let caches_per_archetype = caches.entry(key.clone()).or_default();
132-
caches_per_archetype.handle_pending_invalidation(&key);
132+
133+
let removed_bytes = caches_per_archetype.handle_pending_invalidation();
134+
if removed_bytes > 0 {
135+
re_log::trace!(
136+
store_id = %key.store_id,
137+
entity_path = %key.entity_path,
138+
removed = removed_bytes,
139+
"invalidated latest-at caches"
140+
);
141+
}
133142

134143
let mut latest_at_per_archetype =
135144
caches_per_archetype.latest_at_per_archetype.write();
@@ -166,7 +175,16 @@ impl Caches {
166175
let mut caches = caches.0.write();
167176

168177
let caches_per_archetype = caches.entry(key.clone()).or_default();
169-
caches_per_archetype.handle_pending_invalidation(&key);
178+
179+
let removed_bytes = caches_per_archetype.handle_pending_invalidation();
180+
if removed_bytes > 0 {
181+
re_log::trace!(
182+
store_id = %key.store_id,
183+
entity_path = %key.entity_path,
184+
removed = removed_bytes,
185+
"invalidated range caches"
186+
);
187+
}
170188

171189
let mut range_per_archetype = caches_per_archetype.range_per_archetype.write();
172190
let range_cache = range_per_archetype.entry(A::name()).or_default();
@@ -281,7 +299,7 @@ impl StoreSubscriber for Caches {
281299
// TODO(cmc): This is horribly stupid and slow and can easily be made faster by adding
282300
// yet another layer of caching indirection.
283301
// But since this pretty much never happens in practice, let's not go there until we
284-
// have metrics showing that we need to.
302+
// have metrics showing that show we need to.
285303
{
286304
re_tracing::profile_scope!("timeless");
287305

@@ -318,62 +336,63 @@ impl CachesPerArchetype {
318336
///
319337
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
320338
/// time effectively behaves as a natural micro-batching mechanism.
321-
fn handle_pending_invalidation(&mut self, key: &CacheKey) {
339+
///
340+
/// Returns the number of bytes removed.
341+
fn handle_pending_invalidation(&mut self) -> u64 {
322342
let pending_timeless_invalidation = self.pending_timeless_invalidation;
323343
let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();
324344

325345
if !pending_timeless_invalidation && !pending_timeful_invalidation {
326-
return;
346+
return 0;
327347
}
328348

329349
re_tracing::profile_function!();
330350

331-
// TODO(cmc): range invalidation
351+
let time_threshold = self.pending_timeful_invalidation.unwrap_or(TimeInt::MAX);
332352

333-
for latest_at_cache in self.latest_at_per_archetype.read().values() {
334-
let mut latest_at_cache = latest_at_cache.write();
335-
336-
if pending_timeless_invalidation {
337-
latest_at_cache.timeless = None;
338-
}
353+
self.pending_timeful_invalidation = None;
354+
self.pending_timeless_invalidation = false;
339355

340-
let mut removed_bytes = 0u64;
341-
if let Some(min_time) = self.pending_timeful_invalidation {
342-
latest_at_cache
343-
.per_query_time
344-
.retain(|&query_time, _| query_time < min_time);
356+
// Timeless being infinitely into the past, this effectively invalidates _everything_ with
357+
// the current coarse-grained / archetype-level caching strategy.
358+
if pending_timeless_invalidation {
359+
re_tracing::profile_scope!("timeless");
360+
361+
let latest_at_removed_bytes = self
362+
.latest_at_per_archetype
363+
.read()
364+
.values()
365+
.map(|latest_at_cache| latest_at_cache.read().total_size_bytes())
366+
.sum::<u64>();
367+
let range_removed_bytes = self
368+
.range_per_archetype
369+
.read()
370+
.values()
371+
.map(|range_cache| range_cache.read().total_size_bytes())
372+
.sum::<u64>();
373+
374+
*self = CachesPerArchetype::default();
375+
376+
return latest_at_removed_bytes + range_removed_bytes;
377+
}
345378

346-
latest_at_cache.per_data_time.retain(|&data_time, bucket| {
347-
if data_time < min_time {
348-
return true;
349-
}
379+
re_tracing::profile_scope!("timeful");
350380

351-
// Only if that bucket is about to be dropped.
352-
if Arc::strong_count(bucket) == 1 {
353-
removed_bytes += bucket.read().total_size_bytes;
354-
}
381+
let mut removed_bytes = 0u64;
355382

356-
false
357-
});
358-
}
383+
for latest_at_cache in self.latest_at_per_archetype.read().values() {
384+
let mut latest_at_cache = latest_at_cache.write();
385+
removed_bytes =
386+
removed_bytes.saturating_add(latest_at_cache.truncate_at_time(time_threshold));
387+
}
359388

360-
latest_at_cache.total_size_bytes = latest_at_cache
361-
.total_size_bytes
362-
.checked_sub(removed_bytes)
363-
.unwrap_or_else(|| {
364-
re_log::debug!(
365-
store_id = %key.store_id,
366-
entity_path = %key.entity_path,
367-
current = latest_at_cache.total_size_bytes,
368-
removed = removed_bytes,
369-
"book keeping underflowed"
370-
);
371-
u64::MIN
372-
});
389+
for range_cache in self.range_per_archetype.read().values() {
390+
let mut range_cache = range_cache.write();
391+
removed_bytes =
392+
removed_bytes.saturating_add(range_cache.truncate_at_time(time_threshold));
373393
}
374394

375-
self.pending_timeful_invalidation = None;
376-
self.pending_timeless_invalidation = false;
395+
removed_bytes
377396
}
378397
}
379398

@@ -558,6 +577,64 @@ impl CacheBucket {
558577
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
559578
Some(data.range(entry_range))
560579
}
580+
581+
/// Removes everything from the bucket that corresponds to a time equal or greater than the
582+
/// specified `threshold`.
583+
///
584+
/// Returns the number of bytes removed.
585+
#[inline]
586+
pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
587+
let Self {
588+
data_times,
589+
pov_instance_keys,
590+
components,
591+
total_size_bytes,
592+
} = self;
593+
594+
let mut removed_bytes = 0u64;
595+
596+
let threshold_idx = data_times.partition_point(|(data_time, _)| data_time < &threshold);
597+
598+
{
599+
let total_size_bytes_before = data_times.total_size_bytes();
600+
data_times.truncate(threshold_idx);
601+
removed_bytes += total_size_bytes_before - data_times.total_size_bytes();
602+
}
603+
604+
{
605+
let total_size_bytes_before = pov_instance_keys.total_size_bytes();
606+
pov_instance_keys.truncate(threshold_idx);
607+
removed_bytes += total_size_bytes_before - pov_instance_keys.total_size_bytes();
608+
}
609+
610+
for data in components.values_mut() {
611+
let total_size_bytes_before = data.dyn_total_size_bytes();
612+
data.dyn_truncate(threshold_idx);
613+
removed_bytes += total_size_bytes_before - data.dyn_total_size_bytes();
614+
}
615+
616+
debug_assert!({
617+
let expected_num_entries = data_times.len();
618+
data_times.len() == expected_num_entries
619+
&& pov_instance_keys.num_entries() == expected_num_entries
620+
&& components
621+
.values()
622+
.all(|data| data.dyn_num_entries() == expected_num_entries)
623+
});
624+
625+
*total_size_bytes = total_size_bytes
626+
.checked_sub(removed_bytes)
627+
.unwrap_or_else(|| {
628+
re_log::debug!(
629+
current = *total_size_bytes,
630+
removed = removed_bytes,
631+
"book keeping underflowed"
632+
);
633+
u64::MIN
634+
});
635+
636+
removed_bytes
637+
}
561638
}
562639

563640
macro_rules! impl_insert {
@@ -591,7 +668,7 @@ macro_rules! impl_insert {
591668

592669
{
593670
// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
594-
// instead, that way we can efficiently computes its size while we're at it.
671+
// instead, that way we can efficiently compute its size while we're at it.
595672
let added: FlatVecDeque<InstanceKey> = arch_view
596673
.iter_instance_keys()
597674
.collect::<VecDeque<InstanceKey>>()

crates/re_query_cache/src/cache_stats.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::BTreeMap;
22

33
use re_log_types::{EntityPath, TimeRange, Timeline};
4-
use re_types_core::ComponentName;
4+
use re_types_core::{ComponentName, SizeBytes as _};
55

66
use crate::{cache::CacheBucket, Caches, LatestAtCache, RangeCache};
77

@@ -101,10 +101,10 @@ impl Caches {
101101
per_query_time: _,
102102
per_data_time,
103103
timeless,
104-
total_size_bytes: _,
104+
..
105105
} = &*latest_at_cache.read();
106106

107-
total_size_bytes += latest_at_cache.total_size_bytes;
107+
total_size_bytes += latest_at_cache.total_size_bytes();
108108
total_rows = per_data_time.len() as u64 + timeless.is_some() as u64;
109109

110110
if let Some(per_component) = per_component.as_mut() {
@@ -141,10 +141,9 @@ impl Caches {
141141
.read()
142142
.values()
143143
.map(|range_cache| {
144-
let RangeCache {
144+
let range_cache @ RangeCache {
145145
per_data_time,
146146
timeless,
147-
total_size_bytes,
148147
} = &*range_cache.read();
149148

150149
let total_rows = per_data_time.data_times.len() as u64;
@@ -161,7 +160,7 @@ impl Caches {
161160
key.timeline,
162161
per_data_time.time_range().unwrap_or(TimeRange::EMPTY),
163162
CachedEntityStats {
164-
total_size_bytes: *total_size_bytes,
163+
total_size_bytes: range_cache.total_size_bytes(),
165164
total_rows,
166165

167166
per_component,

crates/re_query_cache/src/flat_vec_deque.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,15 @@ pub trait ErasedFlatVecDeque: std::any::Any {
4545
/// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to
4646
/// avoid even with explicit syntax and that silently lead to infinite recursions.
4747
fn dyn_truncate(&mut self, at: usize);
48+
49+
/// Dynamically dispatches to [`<FlatVecDeque<T> as SizeBytes>::total_size_bytes(self)`].
50+
///
51+
/// This is prefixed with `dyn_` to avoid method dispatch ambiguities that are very hard to
52+
/// avoid even with explicit syntax and that silently lead to infinite recursions.
53+
fn dyn_total_size_bytes(&self) -> u64;
4854
}
4955

50-
impl<T: 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
56+
impl<T: SizeBytes + 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
5157
#[inline]
5258
fn as_any(&self) -> &dyn std::any::Any {
5359
self
@@ -87,6 +93,11 @@ impl<T: 'static> ErasedFlatVecDeque for FlatVecDeque<T> {
8793
fn dyn_truncate(&mut self, at: usize) {
8894
FlatVecDeque::<T>::truncate(self, at);
8995
}
96+
97+
#[inline]
98+
fn dyn_total_size_bytes(&self) -> u64 {
99+
<FlatVecDeque<T> as SizeBytes>::total_size_bytes(self)
100+
}
90101
}
91102

92103
// ---

crates/re_query_cache/src/latest_at.rs

+60-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use seq_macro::seq;
77
use re_data_store::{DataStore, LatestAtQuery, TimeInt};
88
use re_log_types::{EntityPath, RowId};
99
use re_query::query_archetype;
10-
use re_types_core::{components::InstanceKey, Archetype, Component};
10+
use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};
1111

1212
use crate::{CacheBucket, Caches, MaybeCachedComponentData};
1313

@@ -38,7 +38,65 @@ pub struct LatestAtCache {
3838
pub timeless: Option<CacheBucket>,
3939

4040
/// Total size of the data stored in this cache in bytes.
41-
pub total_size_bytes: u64,
41+
total_size_bytes: u64,
42+
}
43+
44+
impl SizeBytes for LatestAtCache {
45+
#[inline]
46+
fn heap_size_bytes(&self) -> u64 {
47+
self.total_size_bytes
48+
}
49+
}
50+
51+
impl LatestAtCache {
52+
/// Removes everything from the cache that corresponds to a time equal or greater than the
53+
/// specified `threshold`.
54+
///
55+
/// Reminder: invalidating timeless data is the same as invalidating everything, so just reset
56+
/// the `LatestAtCache` entirely in that case.
57+
///
58+
/// Returns the number of bytes removed.
59+
#[inline]
60+
pub fn truncate_at_time(&mut self, threshold: TimeInt) -> u64 {
61+
let Self {
62+
per_query_time,
63+
per_data_time,
64+
timeless: _,
65+
total_size_bytes,
66+
} = self;
67+
68+
let mut removed_bytes = 0u64;
69+
70+
per_query_time.retain(|&query_time, _| query_time < threshold);
71+
72+
// Buckets for latest-at queries are guaranteed to only ever contain a single entry, so
73+
// just remove the buckets entirely directly.
74+
per_data_time.retain(|&data_time, bucket| {
75+
if data_time < threshold {
76+
return true;
77+
}
78+
79+
// Only if that bucket is about to be dropped.
80+
if Arc::strong_count(bucket) == 1 {
81+
removed_bytes += bucket.read().total_size_bytes;
82+
}
83+
84+
false
85+
});
86+
87+
*total_size_bytes = total_size_bytes
88+
.checked_sub(removed_bytes)
89+
.unwrap_or_else(|| {
90+
re_log::debug!(
91+
current = *total_size_bytes,
92+
removed = removed_bytes,
93+
"book keeping underflowed"
94+
);
95+
u64::MIN
96+
});
97+
98+
removed_bytes
99+
}
42100
}
43101

44102
// --- Queries ---

0 commit comments

Comments
 (0)