Skip to content

Commit c4cc379

Browse files
committed
Merge branch 'main' into antoine/light-arrows
2 parents 4dd75c5 + 867ec9e commit c4cc379

File tree

13 files changed

+368
-275
lines changed

13 files changed

+368
-275
lines changed

crates/store/re_chunk/src/chunk.rs

+26
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,32 @@ impl Chunk {
414414
self
415415
}
416416

417+
/// Clones the chunk into a new chunk where all descriptors are untagged.
418+
///
419+
/// Only useful as a migration tool while the Rerun ecosystem slowly moves over
420+
/// to always using tags for everything.
421+
#[doc(hidden)]
422+
#[inline]
423+
pub fn clone_as_untagged(&self) -> Self {
424+
let mut chunk = self.clone();
425+
426+
let per_component_name = &mut chunk.components;
427+
for (component_name, per_desc) in per_component_name.iter_mut() {
428+
if per_desc.len() != 1 {
429+
// If there are more than one entry, then we're in the land of UB anyway (for now).
430+
continue;
431+
}
432+
433+
let untagged_descriptor = ComponentDescriptor::new(*component_name);
434+
*per_desc = std::mem::take(per_desc)
435+
.into_values()
436+
.map(|list_array| (untagged_descriptor.clone(), list_array))
437+
.collect();
438+
}
439+
440+
chunk
441+
}
442+
417443
/// Clones the chunk into a new chunk where all [`RowId`]s are [`RowId::ZERO`].
418444
pub fn zeroed(self) -> Self {
419445
let row_ids = std::iter::repeat(RowId::ZERO)

crates/store/re_chunk_store/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ pub use self::stats::{ChunkStoreChunkStats, ChunkStoreStats};
3939
pub use self::store::{
4040
ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata,
4141
};
42-
pub use self::subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle};
42+
pub use self::subscribers::{
43+
ChunkStoreSubscriber, ChunkStoreSubscriberHandle, PerStoreChunkSubscriber,
44+
};
4345

4446
pub(crate) use self::store::ColumnMetadataState;
4547

crates/store/re_chunk_store/src/query.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,10 @@ impl ChunkStore {
592592
})
593593
.unwrap_or_default();
594594

595-
debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());
595+
debug_assert!(
596+
chunks.iter().map(|chunk| chunk.id()).all_unique(),
597+
"{entity_path}:{component_name} @ {query:?}",
598+
);
596599

597600
chunks
598601
}

crates/store/re_chunk_store/src/subscribers.rs

+131
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use ahash::HashMap;
2+
use itertools::Itertools as _;
13
use parking_lot::RwLock;
4+
use re_log_types::StoreId;
25

36
use crate::{ChunkStore, ChunkStoreEvent};
47

@@ -57,6 +60,20 @@ pub trait ChunkStoreSubscriber: std::any::Any + Send + Sync {
5760
fn on_events(&mut self, events: &[ChunkStoreEvent]);
5861
}
5962

63+
/// A [`ChunkStoreSubscriber`] that is instantiated for each unique [`StoreId`].
64+
pub trait PerStoreChunkSubscriber: Send + Sync + Default {
65+
/// Arbitrary name for the subscriber.
66+
///
67+
/// Does not need to be unique.
68+
fn name() -> String;
69+
70+
/// Get notified of changes happening in a [`ChunkStore`], see [`ChunkStoreSubscriber::on_events`].
71+
///
72+
/// Unlike [`ChunkStoreSubscriber::on_events`], all items are guaranteed to have the same [`StoreId`]
73+
/// which does not change per invocation.
74+
fn on_events<'a>(&mut self, events: impl Iterator<Item = &'a ChunkStoreEvent>);
75+
}
76+
6077
/// All registered [`ChunkStoreSubscriber`]s.
6178
static SUBSCRIBERS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreSubscriber>>> =
6279
once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));
@@ -143,6 +160,79 @@ impl ChunkStore {
143160
})
144161
}
145162

163+
/// Registers a [`PerStoreChunkSubscriber`] type so it gets automatically notified when data gets added and/or
164+
/// removed to/from a [`ChunkStore`].
165+
pub fn register_per_store_subscriber<S: PerStoreChunkSubscriber + Default + 'static>(
166+
) -> ChunkStoreSubscriberHandle {
167+
let mut subscribers = SUBSCRIBERS.write();
168+
subscribers.push(RwLock::new(Box::new(
169+
PerStoreStoreSubscriberWrapper::<S>::default(),
170+
)));
171+
ChunkStoreSubscriberHandle(subscribers.len() as u32 - 1)
172+
}
173+
174+
/// Passes a reference to the downcasted per-store subscriber to the given `FnMut` callback.
175+
///
176+
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
177+
pub fn with_per_store_subscriber<S: PerStoreChunkSubscriber + 'static, T, F: FnMut(&S) -> T>(
178+
ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
179+
store_id: &StoreId,
180+
mut f: F,
181+
) -> Option<T> {
182+
let subscribers = SUBSCRIBERS.read();
183+
subscribers.get(handle as usize).and_then(|subscriber| {
184+
let subscriber = subscriber.read();
185+
subscriber
186+
.as_any()
187+
.downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
188+
.and_then(|wrapper| wrapper.get(store_id).map(&mut f))
189+
})
190+
}
191+
192+
/// Passes a reference to the downcasted per-store subscriber to the given `FnOnce` callback.
193+
///
194+
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
195+
pub fn with_per_store_subscriber_once<
196+
S: PerStoreChunkSubscriber + 'static,
197+
T,
198+
F: FnOnce(&S) -> T,
199+
>(
200+
ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
201+
store_id: &StoreId,
202+
f: F,
203+
) -> Option<T> {
204+
let subscribers = SUBSCRIBERS.read();
205+
subscribers.get(handle as usize).and_then(|subscriber| {
206+
let subscriber = subscriber.read();
207+
subscriber
208+
.as_any()
209+
.downcast_ref::<PerStoreStoreSubscriberWrapper<S>>()
210+
.and_then(|wrapper| wrapper.get(store_id).map(f))
211+
})
212+
}
213+
214+
/// Passes a mutable reference to the downcasted per-store subscriber to the given callback.
215+
///
216+
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
217+
pub fn with_per_store_subscriber_mut<
218+
S: PerStoreChunkSubscriber + 'static,
219+
T,
220+
F: FnMut(&mut S) -> T,
221+
>(
222+
ChunkStoreSubscriberHandle(handle): ChunkStoreSubscriberHandle,
223+
store_id: &StoreId,
224+
mut f: F,
225+
) -> Option<T> {
226+
let subscribers = SUBSCRIBERS.read();
227+
subscribers.get(handle as usize).and_then(|subscriber| {
228+
let mut subscriber = subscriber.write();
229+
subscriber
230+
.as_any_mut()
231+
.downcast_mut::<PerStoreStoreSubscriberWrapper<S>>()
232+
.and_then(|wrapper| wrapper.get_mut(store_id).map(&mut f))
233+
})
234+
}
235+
146236
/// Called by [`ChunkStore`]'s mutating methods to notify subscriber subscribers of upcoming events.
147237
pub(crate) fn on_events(events: &[ChunkStoreEvent]) {
148238
re_tracing::profile_function!();
@@ -154,6 +244,47 @@ impl ChunkStore {
154244
}
155245
}
156246

247+
/// Utility that makes a [`PerStoreChunkSubscriber`] a [`ChunkStoreSubscriber`].
248+
#[derive(Default)]
249+
struct PerStoreStoreSubscriberWrapper<S: PerStoreChunkSubscriber> {
250+
subscribers: HashMap<StoreId, Box<S>>,
251+
}
252+
253+
impl<S: PerStoreChunkSubscriber + 'static> PerStoreStoreSubscriberWrapper<S> {
254+
fn get(&self, store_id: &StoreId) -> Option<&S> {
255+
self.subscribers.get(store_id).map(|s| s.as_ref())
256+
}
257+
258+
fn get_mut(&mut self, store_id: &StoreId) -> Option<&mut S> {
259+
self.subscribers.get_mut(store_id).map(|s| s.as_mut())
260+
}
261+
}
262+
263+
impl<S: PerStoreChunkSubscriber + 'static> ChunkStoreSubscriber
264+
for PerStoreStoreSubscriberWrapper<S>
265+
{
266+
fn name(&self) -> String {
267+
S::name()
268+
}
269+
270+
fn as_any(&self) -> &dyn std::any::Any {
271+
self
272+
}
273+
274+
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
275+
self
276+
}
277+
278+
fn on_events(&mut self, events: &[ChunkStoreEvent]) {
279+
for (store_id, events) in &events.iter().chunk_by(|e| e.store_id.clone()) {
280+
self.subscribers
281+
.entry(store_id)
282+
.or_default()
283+
.on_events(events);
284+
}
285+
}
286+
}
287+
157288
#[cfg(test)]
158289
mod tests {
159290
use std::sync::Arc;

crates/store/re_chunk_store/src/writes.rs

+30-21
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,6 @@ impl ChunkStore {
3737
return Ok(Vec::new());
3838
}
3939

40-
// NOTE: It is very easy to end in a situation where one pulls an old blueprint from
41-
// somewhere, and then modifies it at runtime, therefore ending with both tagged and
42-
// untagged components in the data.
43-
// I'd like to keep this debug assertion a tiny bit longer just in case, so for we just
44-
// ignore blueprints.
45-
#[cfg(debug_assertions)]
46-
if self.id.kind == re_log_types::StoreKind::Recording {
47-
for (component_name, per_desc) in chunk.components().iter() {
48-
assert!(
49-
per_desc.len() <= 1,
50-
"[DEBUG ONLY] Insert Chunk with multiple values for component named `{component_name}`: this is currently UB",
51-
);
52-
}
53-
}
54-
5540
if !chunk.is_sorted() {
5641
return Err(ChunkStoreError::UnsortedChunk);
5742
}
@@ -64,7 +49,31 @@ impl ChunkStore {
6449

6550
self.insert_id += 1;
6651

67-
let non_compacted_chunk = Arc::clone(chunk); // we'll need it to create the store event
52+
let mut chunk = Arc::clone(chunk);
53+
54+
// We're in a transition period during which the Rerun ecosystem is slowly moving over to tagged data.
55+
//
56+
// During that time, it is common to end up in situations where the blueprint intermixes both tagged
57+
// and untagged components, which invariably leads to undefined behavior.
58+
// To prevent that, we just always hot-patch it to untagged, for now.
59+
//
60+
// Examples:
61+
// * An SDK logs a blueprint (tagged), which is then updated by the viewer (which uses untagged log calls).
62+
// * Somebody loads an old .rbl from somewhere and starts logging new blueprint data to it.
63+
// * Etc.
64+
if self.id.kind == re_log_types::StoreKind::Blueprint {
65+
chunk = Arc::new(chunk.clone_as_untagged());
66+
}
67+
68+
#[cfg(debug_assertions)]
69+
for (component_name, per_desc) in chunk.components().iter() {
70+
assert!(
71+
per_desc.len() <= 1,
72+
"[DEBUG ONLY] Insert Chunk with multiple values for component named `{component_name}`: this is currently UB",
73+
);
74+
}
75+
76+
let non_compacted_chunk = Arc::clone(&chunk); // we'll need it to create the store event
6877

6978
let (chunk, diffs) = if chunk.is_static() {
7079
// Static data: make sure to keep the most recent chunk available for each component column.
@@ -140,7 +149,7 @@ impl ChunkStore {
140149
.or_insert_with(|| chunk.id());
141150
}
142151

143-
self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(chunk);
152+
self.static_chunks_stats += ChunkStoreChunkStats::from_chunk(&chunk);
144153

145154
let mut diffs = vec![ChunkStoreDiff::addition(
146155
non_compacted_chunk, /* added */
@@ -194,23 +203,23 @@ impl ChunkStore {
194203
}
195204
}
196205

197-
(Arc::clone(chunk), diffs)
206+
(Arc::clone(&chunk), diffs)
198207
} else {
199208
// Temporal data: just index the chunk on every dimension of interest.
200209
re_tracing::profile_scope!("temporal");
201210

202211
let (elected_chunk, chunk_or_compacted) = {
203212
re_tracing::profile_scope!("election");
204213

205-
let elected_chunk = self.find_and_elect_compaction_candidate(chunk);
214+
let elected_chunk = self.find_and_elect_compaction_candidate(&chunk);
206215

207216
let chunk_or_compacted = if let Some(elected_chunk) = &elected_chunk {
208217
let chunk_rowid_min = chunk.row_id_range().map(|(min, _)| min);
209218
let elected_rowid_min = elected_chunk.row_id_range().map(|(min, _)| min);
210219

211220
let mut compacted = if elected_rowid_min < chunk_rowid_min {
212221
re_tracing::profile_scope!("concat");
213-
elected_chunk.concatenated(chunk)?
222+
elected_chunk.concatenated(&chunk)?
214223
} else {
215224
re_tracing::profile_scope!("concat");
216225
chunk.concatenated(elected_chunk)?
@@ -233,7 +242,7 @@ impl ChunkStore {
233242

234243
Arc::new(compacted)
235244
} else {
236-
Arc::clone(chunk)
245+
Arc::clone(&chunk)
237246
};
238247

239248
(elected_chunk, chunk_or_compacted)

crates/viewer/re_time_panel/src/data_density_graph.rs

+28-22
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use re_chunk_store::RangeQuery;
1414
use re_log_types::{ComponentPath, ResolvedTimeRange, TimeInt, Timeline};
1515
use re_viewer_context::{Item, TimeControl, UiLayout, ViewerContext};
1616

17-
use crate::recursive_chunks_per_timeline_subscriber::PathRecursiveChunksPerTimeline;
17+
use crate::recursive_chunks_per_timeline_subscriber::PathRecursiveChunksPerTimelineStoreSubscriber;
1818
use crate::TimePanelItem;
1919

2020
use super::time_ranges_ui::TimeRangesUi;
@@ -472,27 +472,33 @@ pub fn build_density_graph<'a>(
472472
total_num_events,
473473
)
474474
} else {
475-
PathRecursiveChunksPerTimeline::access(&store.id(), |chunks_per_timeline| {
476-
let Some(info) = chunks_per_timeline
477-
.path_recursive_chunks_for_entity_and_timeline(&item.entity_path, &timeline)
478-
else {
479-
return Default::default();
480-
};
481-
482-
(
483-
info.recursive_chunks_info
484-
.values()
485-
.map(|info| {
486-
(
487-
info.chunk.clone(),
488-
info.resolved_time_range,
489-
info.num_events,
490-
)
491-
})
492-
.collect(),
493-
info.total_num_events,
494-
)
495-
})
475+
PathRecursiveChunksPerTimelineStoreSubscriber::access(
476+
&store.id(),
477+
|chunks_per_timeline| {
478+
let Some(info) = chunks_per_timeline
479+
.path_recursive_chunks_for_entity_and_timeline(
480+
&item.entity_path,
481+
&timeline,
482+
)
483+
else {
484+
return Default::default();
485+
};
486+
487+
(
488+
info.recursive_chunks_info
489+
.values()
490+
.map(|info| {
491+
(
492+
info.chunk.clone(),
493+
info.resolved_time_range,
494+
info.num_events,
495+
)
496+
})
497+
.collect(),
498+
info.total_num_events,
499+
)
500+
},
501+
)
496502
.unwrap_or_default()
497503
}
498504
};

crates/viewer/re_time_panel/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use re_viewer_context::{
3333
};
3434
use re_viewport_blueprint::ViewportBlueprint;
3535

36-
use recursive_chunks_per_timeline_subscriber::PathRecursiveChunksPerTimeline;
36+
use recursive_chunks_per_timeline_subscriber::PathRecursiveChunksPerTimelineStoreSubscriber;
3737
use time_axis::TimelineAxis;
3838
use time_control_ui::TimeControlUi;
3939
use time_ranges_ui::TimeRangesUi;
@@ -147,7 +147,7 @@ impl TimePanel {
147147
/// This is implicitly called by [`Self::default`], but may need to be explicitly called in,
148148
/// e.g., testing context.
149149
pub fn ensure_registered_subscribers() {
150-
PathRecursiveChunksPerTimeline::ensure_registered();
150+
PathRecursiveChunksPerTimelineStoreSubscriber::ensure_registered();
151151
}
152152

153153
pub fn new_blueprint_panel() -> Self {

0 commit comments

Comments
 (0)