Skip to content

Add rerun rrd verify #9128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 4, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ fn generate_component_reflection(
docstring_md: #docstring_md,
custom_placeholder: #custom_placeholder,
datatype: #type_name::arrow_datatype(),
verify_arrow_array: #type_name::verify_arrow_array,
}
};
quoted_pairs.push(quote! { (#quoted_name, #quoted_reflection) });
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
return self.next();
}

re_log::debug!("Reached end of stream, iterator complete");
re_log::trace!("Reached end of stream, iterator complete");
return None;
};

Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/decoder/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl<R: AsyncBufRead + Unpin> Stream for StreamingDecoder<R> {
continue;
}

re_log::debug!("Reached end of stream, iterator complete");
re_log::trace!("Reached end of stream, iterator complete");
return std::task::Poll::Ready(None);
};

Expand Down
4 changes: 3 additions & 1 deletion crates/store/re_log_encoding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ pub enum OptionsError {
UnknownCompression(u8),

// TODO(jan): Remove this at some point, realistically 1-2 releases after 0.23
#[error("Attempted to use the removed MsgPack serializer, which is no longer supported")]
#[error(
"You are trying to load an old .rrd file that's not supported by this version of Rerun."
)]
RemovedMsgPackSerializer,

#[error("Unknown serializer: {0}")]
Expand Down
109 changes: 109 additions & 0 deletions crates/store/re_types/src/reflection/mod.rs

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions crates/store/re_types_core/src/loggable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ pub trait Loggable: 'static + Send + Sync + Clone + Sized + SizeBytes {
) -> crate::DeserializationResult<Vec<Option<Self>>> {
Self::from_arrow(data).map(|v| v.into_iter().map(Some).collect())
}

/// Verifies that the given Arrow array can be deserialized into a collection of [`Self`]s.
///
/// Calls [`Self::from_arrow`] and returns an error if it fails.
fn verify_arrow_array(data: &dyn arrow::array::Array) -> crate::DeserializationResult<()> {
Self::from_arrow(data).map(|_| ())
}
}

/// A [`Component`] describes semantic data that can be used by any number of [`Archetype`]s.
Expand Down
7 changes: 7 additions & 0 deletions crates/store/re_types_core/src/reflection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ pub struct ComponentReflection {

/// Datatype of the component.
pub datatype: arrow::datatypes::DataType,

/// Checks that the given Arrow array can be deserialized into a collection of [`Self`]s.
pub verify_arrow_array: fn(&dyn arrow::array::Array) -> crate::DeserializationResult<()>,
}

/// Runtime reflection about archetypes.
Expand Down Expand Up @@ -271,6 +274,10 @@ impl ArchetypeReflection {
pub fn required_fields(&self) -> impl Iterator<Item = &ArchetypeFieldReflection> {
self.fields.iter().filter(|field| field.is_required)
}

pub fn get_field(&self, field_name: &str) -> Option<&ArchetypeFieldReflection> {
self.fields.iter().find(|field| field.name == field_name)
}
}

/// Additional information about an archetype's field.
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/rrd/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl FilterCommand {
Ok(size_bytes)
});

for res in rx_decoder {
for (_source, res) in rx_decoder {
let mut is_success = true;

match res {
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/rrd/merge_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn merge_and_compact(

let mut entity_dbs: std::collections::HashMap<StoreId, EntityDb> = Default::default();

for res in rx {
for (_source, res) in rx {
let mut is_success = true;

match res {
Expand Down
8 changes: 8 additions & 0 deletions crates/top/rerun/src/commands/rrd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ mod compare;
mod filter;
mod merge_compact;
mod print;
mod verify;

use self::compare::CompareCommand;
use self::filter::FilterCommand;
use self::merge_compact::{CompactCommand, MergeCommand};
use self::print::PrintCommand;
use self::verify::VerifyCommand;

// ---

Expand All @@ -29,6 +31,11 @@ pub enum RrdCommands {
/// Example: `rerun rrd print /my/recordings/*.rrd`
Print(PrintCommand),

/// Verify the that the .rrd file can be loaded and correctly interpreted.
///
/// Can be used to ensure that the current Rerun version can load the data.
Verify(VerifyCommand),

/// Compacts the contents of one or more .rrd/.rbl files/streams and writes the result standard output.
///
/// Reads from standard input if no paths are specified.
Expand Down Expand Up @@ -76,6 +83,7 @@ impl RrdCommands {
.with_context(|| format!("current directory {:?}", std::env::current_dir()))
}
Self::Print(print_command) => print_command.run(),
Self::Verify(verify_command) => verify_command.run(),
Self::Compact(compact_command) => compact_command.run(),
Self::Merge(merge_command) => merge_command.run(),
Self::Filter(drop_command) => drop_command.run(),
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/rrd/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl PrintCommand {
let version_policy = re_log_encoding::VersionPolicy::Warn;
let (rx, _) = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);

for res in rx {
for (_source, res) in rx {
let mut is_success = true;

match res {
Expand Down
174 changes: 174 additions & 0 deletions crates/top/rerun/src/commands/rrd/verify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::collections::HashSet;

use arrow::array::AsArray as _;

use re_log_types::LogMsg;
use re_types::reflection::Reflection;

use crate::commands::read_rrd_streams_from_file_or_stdin;

// ---

#[derive(Debug, Clone, clap::Parser)]
pub struct VerifyCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,
}

impl VerifyCommand {
pub fn run(&self) -> anyhow::Result<()> {
let mut verifier = Verifier::new()?;

let Self { path_to_input_rrds } = self;

// TODO(cmc): might want to make this configurable at some point.
let version_policy = re_log_encoding::VersionPolicy::Warn;
let (rx, _) = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);

let mut seen_files = std::collections::HashSet::new();

for (source, res) in rx {
verifier.verify_log_msg(&source.to_string(), res?);
seen_files.insert(source);
}

if verifier.errors.is_empty() {
if seen_files.len() == 1 {
eprintln!("1 file verified without error.");
} else {
eprintln!("{} files verified without error.", seen_files.len());
}
Ok(())
} else {
for err in &verifier.errors {
eprintln!("{err}");
}
Err(anyhow::anyhow!(
"Verification failed with {} errors",
verifier.errors.len()
))
}
}
}

struct Verifier {
reflection: Reflection,
errors: HashSet<String>,
}

impl Verifier {
fn new() -> anyhow::Result<Self> {
Ok(Self {
reflection: re_types::reflection::generate_reflection()?,
errors: HashSet::new(),
})
}

fn verify_log_msg(&mut self, source: &str, msg: LogMsg) {
match msg {
LogMsg::SetStoreInfo { .. } | LogMsg::BlueprintActivationCommand { .. } => {}

LogMsg::ArrowMsg(_store_id, arrow_msg) => {
self.verify_record_batch(source, &arrow_msg.batch);
}
}
}

fn verify_record_batch(&mut self, source: &str, batch: &arrow::array::RecordBatch) {
match re_sorbet::ChunkBatch::try_from(batch) {
Ok(chunk_batch) => self.verify_chunk_batch(source, &chunk_batch),
Err(err) => {
self.errors
.insert(format!("{source}: Failed to parse batch: {err}"));
}
}
}

fn verify_chunk_batch(&mut self, source: &str, chunk_batch: &re_sorbet::ChunkBatch) {
for (component_descriptor, column) in chunk_batch.component_columns() {
if let Err(err) = self.verify_component_column(component_descriptor, column) {
self.errors.insert(format!(
"{source}: Failed to deserialize column {}: {}",
component_descriptor.component_name,
re_error::format(err)
));
}
}
}

fn verify_component_column(
&self,
component_descriptor: &re_sorbet::ComponentColumnDescriptor,
column: &dyn arrow::array::Array,
) -> anyhow::Result<()> {
let re_sorbet::ComponentColumnDescriptor {
component_name,
archetype_name,
archetype_field_name,
..
} = component_descriptor;

if !component_name.full_name().starts_with("rerun.") {
re_log::debug_once!("Ignoring non-Rerun component {component_name:?}");
return Ok(());
}

if component_name.is_indicator_component() {
// Lacks reflection and data
} else {
// Verify data
let component_reflection = self
.reflection
.components
.get(component_name)
.ok_or_else(|| anyhow::anyhow!("Unknown component"))?;

let list_array = column.as_list_opt::<i32>().ok_or_else(|| {
anyhow::anyhow!("Expected list array, found {:?}", column.data_type())
})?;

assert_eq!(column.len() + 1, list_array.offsets().len());

for i in 0..column.len() {
let cell = list_array.value(i);
(component_reflection.verify_arrow_array)(cell.as_ref())?;
}
}

if let Some(archetype_name) = archetype_name {
if archetype_name.full_name().starts_with("rerun.") {
// Verify archetype.
// We may want to have a flag to allow some of this?
let archetype_reflection = self
.reflection
.archetypes
.get(archetype_name)
.ok_or_else(|| anyhow::anyhow!("Unknown archetype: {archetype_name:?}"))?;

if let Some(archetype_field_name) = archetype_field_name {
// Verify archetype field.
// We may want to have a flag to allow some of this?
let archetype_field_reflection = archetype_reflection
.get_field(archetype_field_name)
.ok_or_else(|| {
anyhow::anyhow!(
"Input column referred to the archetype field name {archetype_field_name:?} of {archetype_name:?}, which only has the fields: {:?}",
archetype_reflection.fields.iter().map(|field| field.name)
)
})?;

let expected_component_name = &archetype_field_reflection.component_name;
if component_name != expected_component_name {
return Err(anyhow::anyhow!(
"Archetype field {archetype_field_name:?} of {archetype_name:?} has component {expected_component_name:?} in this version of Rerun, but the data column has component {component_name:?}"
));
}
}
} else {
re_log::debug_once!("Ignoring non-Rerun archetype {archetype_name:?}");
}
}

Ok(())
}
}
29 changes: 23 additions & 6 deletions crates/top/rerun/src/commands/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ use re_log_types::LogMsg;

// ---

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum InputSource {
Stdin,
File(PathBuf),
}

impl std::fmt::Display for InputSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stdin => write!(f, "stdin"),
Self::File(path) => write!(f, "{path:?}"),
}
}
}

/// Asynchronously decodes potentially multiplexed RRD streams from the given `paths`, or standard
/// input if none are specified.
///
Expand All @@ -26,7 +41,7 @@ pub fn read_rrd_streams_from_file_or_stdin(
version_policy: re_log_encoding::VersionPolicy,
paths: &[String],
) -> (
channel::Receiver<anyhow::Result<LogMsg>>,
channel::Receiver<(InputSource, anyhow::Result<LogMsg>)>,
channel::Receiver<u64>,
) {
let path_to_input_rrds = paths.iter().map(PathBuf::from).collect_vec();
Expand All @@ -53,14 +68,14 @@ pub fn read_rrd_streams_from_file_or_stdin(
{
Ok(decoder) => decoder,
Err(err) => {
tx.send(Err(err)).ok();
tx.send((InputSource::Stdin, Err(err))).ok();
return;
}
};

for res in &mut decoder {
let res = res.context("couldn't decode message from stdin -- skipping");
tx.send(res).ok();
tx.send((InputSource::Stdin, res)).ok();
}

size_bytes += decoder.size_bytes();
Expand All @@ -73,7 +88,8 @@ pub fn read_rrd_streams_from_file_or_stdin(
{
Ok(file) => file,
Err(err) => {
tx.send(Err(err)).ok();
tx.send((InputSource::File(rrd_path.clone()), Err(err)))
.ok();
continue;
}
};
Expand All @@ -84,7 +100,8 @@ pub fn read_rrd_streams_from_file_or_stdin(
{
Ok(decoder) => decoder,
Err(err) => {
tx.send(Err(err)).ok();
tx.send((InputSource::File(rrd_path.clone()), Err(err)))
.ok();
continue;
}
};
Expand All @@ -93,7 +110,7 @@ pub fn read_rrd_streams_from_file_or_stdin(
let res = res.context("decode rrd message").with_context(|| {
format!("couldn't decode message {rrd_path:?} -- skipping")
});
tx.send(res).ok();
tx.send((InputSource::File(rrd_path.clone()), res)).ok();
}

size_bytes += decoder.size_bytes();
Expand Down
Loading