Skip to content

Introduce IndexCell #9226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,10 +740,11 @@ impl PendingRow {

let timelines = timepoint
.into_iter()
.map(|(timeline, time)| {
let times = ArrowScalarBuffer::from(vec![time.as_i64()]);
let time_column = TimeColumn::new(Some(true), timeline, times);
(*timeline.name(), time_column)
.map(|(timeline_name, cell)| {
let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
let time_column =
TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
(timeline_name, time_column)
})
.collect();

Expand Down Expand Up @@ -801,9 +802,9 @@ impl PendingRow {
// deterministic: `TimePoint` is backed by a `BTreeMap`.
for row in rows {
let mut hasher = ahash::AHasher::default();
row.timepoint
.timelines()
.for_each(|timeline| timeline.hash(&mut hasher));
row.timepoint.timeline_names().for_each(|timeline| {
<TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
});

per_timeline_set
.entry(hasher.finish())
Expand Down Expand Up @@ -876,10 +877,10 @@ impl PendingRow {
// Look for unsorted timelines -- if we find any, and the chunk is larger than
// the pre-configured `chunk_max_rows_if_unsorted` threshold, then split _even_
// further!
for (&timeline, _) in row_timepoint {
let time_column = timelines
.entry(*timeline.name())
.or_insert_with(|| PendingTimeColumn::new(timeline));
for (&timeline_name, cell) in row_timepoint {
let time_column = timelines.entry(timeline_name).or_insert_with(|| {
PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
});

if !row_ids.is_empty() // just being extra cautious
&& row_ids.len() as u64 >= chunk_max_rows_if_unsorted
Expand Down Expand Up @@ -913,11 +914,11 @@ impl PendingRow {

row_ids.push(*row_id);

for (&timeline, &time) in row_timepoint {
let time_column = timelines
.entry(*timeline.name())
.or_insert_with(|| PendingTimeColumn::new(timeline));
time_column.push(time);
for (&timeline_name, &cell) in row_timepoint {
let time_column = timelines.entry(timeline_name).or_insert_with(|| {
PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
});
time_column.push(cell.into());
}

for (component_desc, arrays) in &mut components {
Expand Down
26 changes: 7 additions & 19 deletions crates/store/re_chunk/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow::{array::ArrayRef, datatypes::DataType as ArrowDatatype};
use itertools::Itertools as _;
use nohash_hasher::IntMap;

use re_log_types::{EntityPath, TimeInt, TimePoint, TimeType, Timeline, TimelineName};
use re_log_types::{EntityPath, NonMinI64, TimePoint, Timeline, TimelineName};
use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};

use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
Expand Down Expand Up @@ -72,11 +72,11 @@ impl ChunkBuilder {

self.row_ids.push(row_id);

for (timeline, time) in timepoint.into() {
for (timeline, cell) in timepoint.into() {
self.timelines
.entry(*timeline.name())
.or_insert_with(|| TimeColumn::builder(timeline))
.with_row(timeline.typ(), time);
.entry(timeline)
.or_insert_with(|| TimeColumn::builder(Timeline::new(timeline, cell.typ())))
.with_row(cell.value);
}

for (component_name, array) in components {
Expand Down Expand Up @@ -387,20 +387,8 @@ impl TimeColumnBuilder {

/// Add a row's worth of time data using the given timestamp.
#[inline]
pub fn with_row(&mut self, typ: TimeType, time: TimeInt) -> &mut Self {
let Self { timeline, times } = self;

if timeline.typ() != typ {
re_log::warn_once!(
"Mixing {:?} and {:?} in the same time column '{}'",
typ,
timeline.typ(),
timeline.name()
);
}

times.push(time.as_i64());

pub fn with_row(&mut self, time: NonMinI64) -> &mut Self {
self.times.push(time.into());
self
}

Expand Down
14 changes: 13 additions & 1 deletion crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use nohash_hasher::IntMap;
use re_arrow_util::ArrowArrayDowncastRef as _;
use re_byte_size::SizeBytes as _;
use re_log_types::{
EntityPath, ResolvedTimeRange, Time, TimeInt, TimePoint, Timeline, TimelineName,
EntityPath, NonMinI64, ResolvedTimeRange, Time, TimeInt, TimePoint, Timeline, TimelineName,
};
use re_types_core::{
ComponentDescriptor, ComponentName, DeserializationError, Loggable as _, SerializationError,
Expand Down Expand Up @@ -1240,11 +1240,23 @@ impl TimeColumn {
self.timeline.typ().make_arrow_array(self.times.clone())
}

/// All times in a time column are guaranteed not to have the value `i64::MIN`
/// (which is reserved for static data).
#[inline]
pub fn times_raw(&self) -> &[i64] {
self.times.as_ref()
}

/// All times in a time column are guaranteed not to have the value `i64::MIN`
/// (which is reserved for static data).
#[inline]
pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
self.times_raw()
.iter()
.copied()
.map(NonMinI64::saturating_from_i64)
}

#[inline]
pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
self.times_raw().iter().copied().map(TimeInt::new_temporal)
Expand Down
6 changes: 4 additions & 2 deletions crates/store/re_chunk/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ impl LatestAtQuery {
/// The returned query is guaranteed to never include [`TimeInt::STATIC`].
#[inline]
pub fn new(timeline: TimelineName, at: impl TryInto<TimeInt>) -> Self {
let at = at.try_into().unwrap_or(TimeInt::MIN);
Self { timeline, at }
Self {
timeline,
at: TimeInt::saturated_nonstatic(at),
}
}

#[inline]
Expand Down
9 changes: 5 additions & 4 deletions crates/store/re_data_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,19 @@ impl DataLoaderSettings {
args.push("--static".to_owned());
}

for (timeline, time) in timepoint.iter() {
match timeline.typ() {
for (timeline, cell) in timepoint.iter() {
// TODO(#8635): update this
match cell.typ() {
re_log_types::TimeType::Time => {
args.extend([
"--time".to_owned(),
format!("{}={}", timeline.name(), time.as_i64()),
format!("{}={}", timeline, cell.as_i64()),
]);
}
re_log_types::TimeType::Sequence => {
args.extend([
"--sequence".to_owned(),
format!("{}={}", timeline.name(), time.as_i64()),
format!("{}={}", timeline, cell.as_i64()),
]);
}
}
Expand Down
21 changes: 12 additions & 9 deletions crates/store/re_data_loader/src/loader_archetype.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use itertools::Either;

use re_chunk::{Chunk, RowId};
use re_log_types::{EntityPath, TimeInt, TimePoint};
use re_log_types::{EntityPath, TimePoint};
use re_types::archetypes::{AssetVideo, VideoFrameReference};
use re_types::components::VideoTimestamp;
use re_types::Archetype;
Expand Down Expand Up @@ -67,28 +67,28 @@ impl DataLoader for ArchetypeLoader {
// TODO(cmc): log these once heuristics (I think?) are fixed
if false {
if let Ok(metadata) = filepath.metadata() {
use re_log_types::{Time, Timeline};
use re_log_types::IndexCell;

if let Some(created) = metadata
.created()
.ok()
.and_then(|t| TimeInt::try_from(Time::try_from(t).ok()?).ok())
.and_then(|t| IndexCell::try_from(t).ok())
{
timepoint.insert(Timeline::new_temporal("created_at"), created);
timepoint.insert_index("created_at", created);
}
if let Some(modified) = metadata
.modified()
.ok()
.and_then(|t| TimeInt::try_from(Time::try_from(t).ok()?).ok())
.and_then(|t| IndexCell::try_from(t).ok())
{
timepoint.insert(Timeline::new_temporal("modified_at"), modified);
timepoint.insert_index("modified_at", modified);
}
if let Some(accessed) = metadata
.accessed()
.ok()
.and_then(|t| TimeInt::try_from(Time::try_from(t).ok()?).ok())
.and_then(|t| IndexCell::try_from(t).ok())
{
timepoint.insert(Timeline::new_temporal("accessed_at"), accessed);
timepoint.insert_index("accessed_at", accessed);
}
}
}
Expand Down Expand Up @@ -184,7 +184,10 @@ fn load_video(
re_tracing::profile_function!();

let video_timeline = re_log_types::Timeline::new_temporal("video");
timepoint.insert(video_timeline, re_log_types::TimeInt::new_temporal(0));
timepoint.insert_index(
*video_timeline.name(),
re_log_types::IndexCell::ZERO_DURATION,
);

let video_asset = AssetVideo::new(contents);

Expand Down
21 changes: 7 additions & 14 deletions crates/store/re_data_loader/src/loader_lerobot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ fn log_episode_task(
continue;
};

let mut timepoint = TimePoint::default();
timepoint.insert(*timeline, time_int);
let timepoint = TimePoint::default().with(*timeline, time_int);
let text = TextDocument::new(task.task.clone());
chunk = chunk.with_archetype(row_id, timepoint, &text);

Expand All @@ -290,17 +289,14 @@ fn load_episode_images(

let mut chunk = Chunk::builder(observation.into());
let mut row_id = RowId::new();
let mut time_int = TimeInt::ZERO;

for idx in 0..image_bytes.len() {
let img_buffer = image_bytes.value(idx);
for frame_idx in 0..image_bytes.len() {
let img_buffer = image_bytes.value(frame_idx);
let encoded_image = EncodedImage::from_file_contents(img_buffer.to_owned());
let mut timepoint = TimePoint::default();
timepoint.insert(*timeline, time_int);
let timepoint = TimePoint::default().with(*timeline, frame_idx as i64);
chunk = chunk.with_archetype(row_id, timepoint, &encoded_image);

row_id = row_id.next();
time_int = time_int.inc();
}

Ok(std::iter::once(chunk.build().with_context(|| {
Expand All @@ -322,19 +318,16 @@ fn load_episode_depth_images(

let mut chunk = Chunk::builder(observation.into());
let mut row_id = RowId::new();
let mut time_int = TimeInt::ZERO;

for idx in 0..image_bytes.len() {
let img_buffer = image_bytes.value(idx);
for frame_idx in 0..image_bytes.len() {
let img_buffer = image_bytes.value(frame_idx);
let depth_image = DepthImage::from_file_contents(img_buffer.to_owned())
.map_err(|err| anyhow!("Failed to decode image: {err}"))?;

let mut timepoint = TimePoint::default();
timepoint.insert(*timeline, time_int);
let timepoint = TimePoint::default().with(*timeline, frame_idx as i64);
chunk = chunk.with_archetype(row_id, timepoint, &depth_image);

row_id = row_id.next();
time_int = time_int.inc();
}

Ok(std::iter::once(chunk.build().with_context(|| {
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub use self::external::re_chunk_store::{
};
#[doc(no_inline)]
pub use self::external::re_log_types::{
EntityPath, EntityPathFilter, EntityPathSubs, ResolvedEntityPathFilter, ResolvedTimeRange,
StoreKind, TimeInt, Timeline, TimelineName,
EntityPath, EntityPathFilter, EntityPathSubs, IndexCell, ResolvedEntityPathFilter,
ResolvedTimeRange, StoreKind, TimeInt, Timeline, TimelineName,
};
#[doc(no_inline)]
pub use self::external::re_query::{QueryCache, QueryCacheHandle, StorageEngine};
Expand Down
Loading