Skip to content

Commit 8dd1681

Browse files
authored
Primary caching 7: Always expose the data time in query responses (#4711)
_99% grunt work, the only somewhat interesting thing happens in `query_archetype`_ Our query model always operates with two distinct timestamps: the timestamp you're querying for (`query_time`) vs. the timestamp of the data you get back (`data_time`). This is the result of our latest-at semantics: a query for a point at time `10` can return a point at time `2`. This is important to know when caching the data: a query at time `4` and a query at time `8` that both return the data at time `2` must share the same single entry or the memory budget would explode. This PR just updates all existing latest-at APIs so they return the data time in their response. This was already the case for range APIs. Note that in the case of `query_archetype`, which is a compound API that emits multiple queries, the data time of the final result is the most recent data time among all of its components. A follow-up PR will use the data time to deduplicate entries in the latest-at cache. --- Part of the primary caching series of PR (index search, joins, deserialization): - #4592 - #4593 - #4659 - #4680 - #4681 - #4698 - #4711 - #4712 - #4721 - #4726 - #4773 - #4784 - #4785 - #4793 - #4800
1 parent d8569ee commit 8dd1681

File tree

20 files changed

+269
-173
lines changed

20 files changed

+269
-173
lines changed

crates/re_data_store/benches/data_store.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ fn latest_data_at<const N: usize>(
458458

459459
store
460460
.latest_at(&timeline_query, &ent_path, primary, secondaries)
461-
.map_or_else(|| [(); N].map(|_| None), |(_, cells)| cells)
461+
.map_or_else(|| [(); N].map(|_| None), |(_, _, cells)| cells)
462462
}
463463

464464
fn range_data<const N: usize>(

crates/re_data_store/src/polars_util.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ pub fn latest_component(
4040
let components = &[cluster_key, primary];
4141
let (_, cells) = store
4242
.latest_at(query, ent_path, primary, components)
43-
.unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
43+
.map_or_else(
44+
|| (RowId::ZERO, [(); 2].map(|_| None)),
45+
|(_, row_id, cells)| (row_id, cells),
46+
);
4447

4548
dataframe_from_cells(&cells)
4649
}

crates/re_data_store/src/store_helpers.rs

+36-15
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,35 @@
1-
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimePoint, Timeline};
1+
use re_log_types::{DataCell, DataRow, EntityPath, RowId, TimeInt, TimePoint, Timeline};
22

33
use re_types_core::{Component, ComponentName};
44

55
use crate::{DataStore, LatestAtQuery};
66

77
// --- Read ---
88

9-
/// A [`Component`] versioned with a specific [`RowId`].
9+
/// A [`Component`] at a specific _data_ time, versioned with a specific [`RowId`].
1010
///
1111
/// This is not enough to globally, uniquely identify an instance of a component.
1212
/// For that you will need to combine the `InstancePath` that was used to query
1313
/// the versioned component with the returned [`RowId`], therefore creating a
1414
/// `VersionedInstancePath`.
1515
#[derive(Debug, Clone)]
1616
pub struct VersionedComponent<C: Component> {
17+
/// `None` if timeless.
18+
pub data_time: Option<TimeInt>,
19+
1720
pub row_id: RowId,
21+
1822
pub value: C,
1923
}
2024

21-
impl<C: Component> From<(RowId, C)> for VersionedComponent<C> {
25+
impl<C: Component> VersionedComponent<C> {
2226
#[inline]
23-
fn from((row_id, value): (RowId, C)) -> Self {
24-
Self { row_id, value }
27+
pub fn new(time: Option<TimeInt>, row_id: RowId, value: C) -> Self {
28+
Self {
29+
data_time: time,
30+
row_id,
31+
value,
32+
}
2533
}
2634
}
2735

@@ -35,7 +43,8 @@ impl<C: Component> std::ops::Deref for VersionedComponent<C> {
3543
}
3644

3745
impl DataStore {
38-
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
46+
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
47+
/// _data_ time and [`RowId`].
3948
///
4049
/// This assumes that the row we get from the store only contains a single instance for this
4150
/// component; it will generate a log message of `level` otherwise.
@@ -51,7 +60,8 @@ impl DataStore {
5160
) -> Option<VersionedComponent<C>> {
5261
re_tracing::profile_function!();
5362

54-
let (row_id, cells) = self.latest_at(query, entity_path, C::name(), &[C::name()])?;
63+
let (data_time, row_id, cells) =
64+
self.latest_at(query, entity_path, C::name(), &[C::name()])?;
5565
let cell = cells.first()?.as_ref()?;
5666

5767
cell.try_to_native_mono::<C>()
@@ -93,10 +103,11 @@ impl DataStore {
93103
err
94104
})
95105
.ok()?
96-
.map(|c| (row_id, c).into())
106+
.map(|c| VersionedComponent::new(data_time, row_id, c))
97107
}
98108

99-
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
109+
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
110+
/// _data_ time and [`RowId`].
100111
///
101112
/// This assumes that the row we get from the store only contains a single instance for this
102113
/// component; it will log a warning otherwise.
@@ -113,7 +124,8 @@ impl DataStore {
113124
self.query_latest_component_with_log_level(entity_path, query, re_log::Level::Warn)
114125
}
115126

116-
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`].
127+
/// Get the latest value for a given [`re_types_core::Component`], as well as the associated
128+
/// _data_ time and [`RowId`].
117129
///
118130
/// This assumes that the row we get from the store only contains a single instance for this
119131
/// component; it will return None and log a debug message otherwise.
@@ -140,15 +152,16 @@ impl DataStore {
140152

141153
let mut cur_path = Some(entity_path.clone());
142154
while let Some(path) = cur_path {
143-
if let Some(c) = self.query_latest_component::<C>(&path, query) {
144-
return Some((path, c));
155+
if let Some(vc) = self.query_latest_component::<C>(&path, query) {
156+
return Some((path, vc));
145157
}
146158
cur_path = path.parent();
147159
}
148160
None
149161
}
150162

151-
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
163+
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
164+
/// assuming it is timeless.
152165
///
153166
/// This assumes that the row we get from the store only contains a single instance for this
154167
/// component; it will log a warning otherwise.
@@ -163,10 +176,14 @@ impl DataStore {
163176
re_tracing::profile_function!();
164177

165178
let query = LatestAtQuery::latest(Timeline::default());
166-
self.query_latest_component(entity_path, &query)
179+
self.query_latest_component(entity_path, &query).map(|vc| {
180+
debug_assert!(vc.data_time.is_none());
181+
vc
182+
})
167183
}
168184

169-
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`], assuming it is timeless.
185+
/// Get the latest value for a given [`re_types_core::Component`] and the associated [`RowId`],
186+
/// assuming it is timeless.
170187
///
171188
/// This assumes that the row we get from the store only contains a single instance for this
172189
/// component; it will return None and log a debug message otherwise.
@@ -182,6 +199,10 @@ impl DataStore {
182199

183200
let query = LatestAtQuery::latest(Timeline::default());
184201
self.query_latest_component_quiet(entity_path, &query)
202+
.map(|vc| {
203+
debug_assert!(vc.data_time.is_none());
204+
vc
205+
})
185206
}
186207
}
187208

crates/re_data_store/src/store_read.rs

+52-33
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,9 @@ impl DataStore {
205205
/// Queries the datastore for the cells of the specified `components`, as seen from the point
206206
/// of view of the so-called `primary` component.
207207
///
208-
/// Returns an array of [`DataCell`]s on success, or `None` otherwise.
209-
/// Success is defined by one thing and thing only: whether a cell could be found for the
208+
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
209+
/// success.
210+
/// Success is defined by one thing and one thing only: whether a cell could be found for the
210211
/// `primary` component.
211212
/// The presence or absence of secondary components has no effect on the success criteria.
212213
///
@@ -238,7 +239,10 @@ impl DataStore {
238239
/// let components = &[cluster_key, primary];
239240
/// let (_, cells) = store
240241
/// .latest_at(&query, ent_path, primary, components)
241-
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
242+
/// .map_or_else(
243+
/// || (RowId::ZERO, [(); 2].map(|_| None)),
244+
/// |(_, row_id, cells)| (row_id, cells),
245+
/// );
242246
///
243247
/// let series: Result<Vec<_>, _> = cells
244248
/// .iter()
@@ -266,7 +270,7 @@ impl DataStore {
266270
ent_path: &EntityPath,
267271
primary: ComponentName,
268272
components: &[ComponentName; N],
269-
) -> Option<(RowId, [Option<DataCell>; N])> {
273+
) -> Option<(Option<TimeInt>, RowId, [Option<DataCell>; N])> {
270274
// TODO(cmc): kind & query_id need to somehow propagate through the span system.
271275
self.query_id.fetch_add(1, Ordering::Relaxed);
272276

@@ -282,7 +286,7 @@ impl DataStore {
282286
"query started…"
283287
);
284288

285-
let cells = self
289+
let results = self
286290
.tables
287291
.get(&(ent_path_hash, query.timeline))
288292
.and_then(|table| {
@@ -301,11 +305,11 @@ impl DataStore {
301305

302306
// If we've found everything we were looking for in the temporal table, then we can
303307
// return the results immediately.
304-
if cells
308+
if results
305309
.as_ref()
306-
.map_or(false, |(_, cells)| cells.iter().all(Option::is_some))
310+
.map_or(false, |(_, _, cells)| cells.iter().all(Option::is_some))
307311
{
308-
return cells;
312+
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells));
309313
}
310314

311315
let cells_timeless = self.timeless_tables.get(&ent_path_hash).and_then(|table| {
@@ -324,21 +328,28 @@ impl DataStore {
324328
});
325329

326330
// Otherwise, let's see what's in the timeless table, and then..:
327-
match (cells, cells_timeless) {
331+
match (results, cells_timeless) {
328332
// nothing in the timeless table: return those partial cells we got.
329-
(Some(cells), None) => return Some(cells),
333+
(results @ Some(_), None) => {
334+
return results.map(|(data_time, row_id, cells)| (Some(data_time), row_id, cells))
335+
}
336+
330337
// no temporal cells, but some timeless ones: return those as-is.
331-
(None, Some(cells_timeless)) => return Some(cells_timeless),
338+
(None, results @ Some(_)) => {
339+
return results.map(|(row_id, cells)| (None, row_id, cells))
340+
}
341+
332342
// we have both temporal & timeless cells: let's merge the two when it makes sense
333343
// and return the end result.
334-
(Some((row_id, mut cells)), Some((_, cells_timeless))) => {
344+
(Some((data_time, row_id, mut cells)), Some((_, cells_timeless))) => {
335345
for (i, row_idx) in cells_timeless.into_iter().enumerate() {
336346
if cells[i].is_none() {
337347
cells[i] = row_idx;
338348
}
339349
}
340-
return Some((row_id, cells));
350+
return Some((Some(data_time), row_id, cells));
341351
}
352+
342353
// no cells at all.
343354
(None, None) => {}
344355
}
@@ -428,7 +439,10 @@ impl DataStore {
428439
/// let query = LatestAtQuery::new(query.timeline, latest_time);
429440
/// let (_, cells) = store
430441
/// .latest_at(&query, ent_path, primary, &components)
431-
/// .unwrap_or((RowId::ZERO, [(); 2].map(|_| None)));
442+
/// .map_or_else(
443+
/// || (RowId::ZERO, [(); 2].map(|_| None)),
444+
/// |(_, row_id, cells)| (row_id, cells),
445+
/// );
432446
/// dataframe_from_cells(cells)
433447
/// };
434448
///
@@ -519,14 +533,14 @@ impl IndexedTable {
519533
/// Queries the table for the cells of the specified `components`, as seen from the point
520534
/// of view of the so-called `primary` component.
521535
///
522-
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
523-
/// the `primary` component.
536+
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
537+
/// success, or `None` iff no cell could be found for the `primary` component.
524538
pub fn latest_at<const N: usize>(
525539
&self,
526-
time: TimeInt,
540+
query_time: TimeInt,
527541
primary: ComponentName,
528542
components: &[ComponentName; N],
529-
) -> Option<(RowId, [Option<DataCell>; N])> {
543+
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
530544
// Early-exit if this entire table is unaware of this component.
531545
if !self.all_components.contains(&primary) {
532546
return None;
@@ -542,22 +556,22 @@ impl IndexedTable {
542556
// multiple indexed buckets within the same table!
543557

544558
let buckets = self
545-
.range_buckets_rev(..=time)
559+
.range_buckets_rev(..=query_time)
546560
.map(|(_, bucket)| bucket)
547561
.enumerate();
548562
for (attempt, bucket) in buckets {
549563
trace!(
550564
kind = "latest_at",
551565
timeline = %timeline.name(),
552-
time = timeline.typ().format_utc(time),
566+
time = timeline.typ().format_utc(query_time),
553567
%primary,
554568
?components,
555569
attempt,
556570
bucket_time_range = timeline.typ().format_range_utc(bucket.inner.read().time_range),
557571
"found candidate bucket"
558572
);
559-
if let cells @ Some(_) = bucket.latest_at(time, primary, components) {
560-
return cells; // found at least the primary component!
573+
if let ret @ Some(_) = bucket.latest_at(query_time, primary, components) {
574+
return ret; // found at least the primary component!
561575
}
562576
}
563577

@@ -717,14 +731,14 @@ impl IndexedBucket {
717731
/// Queries the bucket for the cells of the specified `components`, as seen from the point
718732
/// of view of the so-called `primary` component.
719733
///
720-
/// Returns an array of [`DataCell`]s on success, or `None` iff no cell could be found for
721-
/// the `primary` component.
734+
/// Returns an array of [`DataCell`]s (as well as the associated _data_ time and `RowId`) on
735+
/// success, or `None` iff no cell could be found for the `primary` component.
722736
pub fn latest_at<const N: usize>(
723737
&self,
724-
time: TimeInt,
738+
query_time: TimeInt,
725739
primary: ComponentName,
726740
components: &[ComponentName; N],
727-
) -> Option<(RowId, [Option<DataCell>; N])> {
741+
) -> Option<(TimeInt, RowId, [Option<DataCell>; N])> {
728742
self.sort_indices_if_needed();
729743

730744
let IndexedBucketInner {
@@ -748,11 +762,12 @@ impl IndexedBucket {
748762
%primary,
749763
?components,
750764
timeline = %self.timeline.name(),
751-
time = self.timeline.typ().format_utc(time),
765+
query_time = self.timeline.typ().format_utc(query_time),
752766
"searching for primary & secondary cells…"
753767
);
754768

755-
let time_row_nr = col_time.partition_point(|t| *t <= time.as_i64()) as i64;
769+
let time_row_nr =
770+
col_time.partition_point(|data_time| *data_time <= query_time.as_i64()) as i64;
756771

757772
// The partition point is always _beyond_ the index that we're looking for.
758773
// A partition point of 0 thus means that we're trying to query for data that lives
@@ -769,7 +784,7 @@ impl IndexedBucket {
769784
%primary,
770785
?components,
771786
timeline = %self.timeline.name(),
772-
time = self.timeline.typ().format_utc(time),
787+
query_time = self.timeline.typ().format_utc(query_time),
773788
%primary_row_nr,
774789
"found primary row number",
775790
);
@@ -783,7 +798,7 @@ impl IndexedBucket {
783798
%primary,
784799
?components,
785800
timeline = %self.timeline.name(),
786-
time = self.timeline.typ().format_utc(time),
801+
query_time = self.timeline.typ().format_utc(query_time),
787802
%primary_row_nr,
788803
"no secondary row number found",
789804
);
@@ -797,7 +812,7 @@ impl IndexedBucket {
797812
%primary,
798813
?components,
799814
timeline = %self.timeline.name(),
800-
time = self.timeline.typ().format_utc(time),
815+
query_time = self.timeline.typ().format_utc(query_time),
801816
%primary_row_nr, %secondary_row_nr,
802817
"found secondary row number",
803818
);
@@ -812,7 +827,7 @@ impl IndexedBucket {
812827
%primary,
813828
%component,
814829
timeline = %self.timeline.name(),
815-
time = self.timeline.typ().format_utc(time),
830+
query_time = self.timeline.typ().format_utc(query_time),
816831
%primary_row_nr, %secondary_row_nr,
817832
"found cell",
818833
);
@@ -821,7 +836,11 @@ impl IndexedBucket {
821836
}
822837
}
823838

824-
Some((col_row_id[secondary_row_nr as usize], cells))
839+
Some((
840+
col_time[secondary_row_nr as usize].into(),
841+
col_row_id[secondary_row_nr as usize],
842+
cells,
843+
))
825844
}
826845

827846
/// Iterates the bucket in order to return the cells of the specified `components`,

0 commit comments

Comments
 (0)