Skip to content

New types and traits for (co-existing!) eager serialization #8642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 56 additions & 9 deletions crates/store/re_chunk/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use itertools::Itertools;
use nohash_hasher::IntMap;

use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline};
use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor};
use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};

use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};

Expand Down Expand Up @@ -121,14 +121,8 @@ impl ChunkBuilder {
timepoint: impl Into<TimePoint>,
as_components: &dyn AsComponents,
) -> Self {
let batches = as_components.as_component_batches();
self.with_component_batches(
row_id,
timepoint,
batches
.iter()
.map(|batch| batch as &dyn re_types_core::ComponentBatch),
)
let batches = as_components.as_serialized_batches();
self.with_serialized_batches(row_id, timepoint, batches)
}

/// Add a row's worth of data by serializing a single [`ComponentBatch`].
Expand Down Expand Up @@ -193,6 +187,59 @@ impl ChunkBuilder {
)
}

/// Add a row's worth of data by serializing a single [`ComponentBatch`].
#[inline]
pub fn with_serialized_batch(
self,
row_id: RowId,
timepoint: impl Into<TimePoint>,
component_batch: SerializedComponentBatch,
) -> Self {
self.with_row(
row_id,
timepoint,
[(component_batch.descriptor, component_batch.array)],
)
}

/// Add a row's worth of data by serializing many [`ComponentBatch`]es.
#[inline]
pub fn with_serialized_batches(
self,
row_id: RowId,
timepoint: impl Into<TimePoint>,
component_batches: impl IntoIterator<Item = SerializedComponentBatch>,
) -> Self {
self.with_row(
row_id,
timepoint,
component_batches
.into_iter()
.map(|component_batch| (component_batch.descriptor, component_batch.array)),
)
}

/// Add a row's worth of data by serializing many sparse [`ComponentBatch`]es.
#[inline]
pub fn with_sparse_serialized_batches(
self,
row_id: RowId,
timepoint: impl Into<TimePoint>,
component_batches: impl IntoIterator<
Item = (ComponentDescriptor, Option<SerializedComponentBatch>),
>,
) -> Self {
self.with_sparse_row(
row_id,
timepoint,
component_batches
.into_iter()
.map(|(component_desc, component_batch)| {
(component_desc, component_batch.map(|batch| batch.array))
}),
)
}

/// Builds and returns the final [`Chunk`].
///
/// The arrow datatype of each individual column will be guessed by inspecting the data.
Expand Down
32 changes: 6 additions & 26 deletions crates/store/re_entity_db/tests/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,7 @@ fn clears() -> anyhow::Result<()> {
let timepoint = TimePoint::from_iter([(timeline_frame, 10)]);
let clear = Clear::flat();
let chunk = Chunk::builder(entity_path_parent.clone())
.with_component_batches(
row_id,
timepoint,
clear
.as_component_batches()
.iter()
.map(|b| b as &dyn re_types_core::ComponentBatch),
)
.with_serialized_batches(row_id, timepoint, clear.as_serialized_batches())
.build()?;

db.add_chunk(&Arc::new(chunk))?;
Expand Down Expand Up @@ -163,14 +156,7 @@ fn clears() -> anyhow::Result<()> {
let timepoint = TimePoint::from_iter([(timeline_frame, 10)]);
let clear = Clear::recursive();
let chunk = Chunk::builder(entity_path_parent.clone())
.with_component_batches(
row_id,
timepoint,
clear
.as_component_batches()
.iter()
.map(|b| b as &dyn re_types_core::ComponentBatch),
)
.with_serialized_batches(row_id, timepoint, clear.as_serialized_batches())
.build()?;

db.add_chunk(&Arc::new(chunk))?;
Expand Down Expand Up @@ -351,13 +337,10 @@ fn clears_respect_index_order() -> anyhow::Result<()> {

let clear = Clear::recursive();
let chunk = Chunk::builder(entity_path.clone())
.with_component_batches(
.with_serialized_batches(
row_id1, // older row id!
timepoint.clone(),
clear
.as_component_batches()
.iter()
.map(|b| b as &dyn re_types_core::ComponentBatch),
clear.as_serialized_batches(),
)
.build()?;

Expand All @@ -378,13 +361,10 @@ fn clears_respect_index_order() -> anyhow::Result<()> {

let clear = Clear::recursive();
let chunk = Chunk::builder(entity_path.clone())
.with_component_batches(
.with_serialized_batches(
row_id3, // newer row id!
timepoint.clone(),
clear
.as_component_batches()
.iter()
.map(|b| b as &dyn re_types_core::ComponentBatch),
clear.as_serialized_batches(),
)
.build()?;

Expand Down
Loading
Loading