Skip to content

Commit a4a3bf8

Browse files
authored
Add rerun rrd verify (#9128)
### Related * Part of #9110 ### What Add `rerun rrd verify some.rrd` which verifies that the current rerun version can load and understand the given .rrd file. It goes through each component column in each record batch, find the corresponding component, and then tries to deserialize the arrow data within.
1 parent 65c1cd7 commit a4a3bf8

File tree

13 files changed

+337
-12
lines changed

13 files changed

+337
-12
lines changed

crates/build/re_types_builder/src/codegen/rust/reflection.rs

+1
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ fn generate_component_reflection(
147147
docstring_md: #docstring_md,
148148
custom_placeholder: #custom_placeholder,
149149
datatype: #type_name::arrow_datatype(),
150+
verify_arrow_array: #type_name::verify_arrow_array,
150151
}
151152
};
152153
quoted_pairs.push(quote! { (#quoted_name, #quoted_reflection) });

crates/store/re_log_encoding/src/decoder/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
310310
return self.next();
311311
}
312312

313-
re_log::debug!("Reached end of stream, iterator complete");
313+
re_log::trace!("Reached end of stream, iterator complete");
314314
return None;
315315
};
316316

crates/store/re_log_encoding/src/decoder/streaming.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ impl<R: AsyncBufRead + Unpin> Stream for StreamingDecoder<R> {
209209
continue;
210210
}
211211

212-
re_log::debug!("Reached end of stream, iterator complete");
212+
re_log::trace!("Reached end of stream, iterator complete");
213213
return std::task::Poll::Ready(None);
214214
};
215215

crates/store/re_log_encoding/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ pub enum OptionsError {
123123
UnknownCompression(u8),
124124

125125
// TODO(jan): Remove this at some point, realistically 1-2 releases after 0.23
126-
#[error("Attempted to use the removed MsgPack serializer, which is no longer supported")]
126+
#[error(
127+
"You are trying to load an old .rrd file that's not supported by this version of Rerun."
128+
)]
127129
RemovedMsgPackSerializer,
128130

129131
#[error("Unknown serializer: {0}")]

crates/store/re_types/src/reflection/mod.rs

+109
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/store/re_types_core/src/loggable.rs

+7
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ pub trait Loggable: 'static + Send + Sync + Clone + Sized + SizeBytes {
7575
) -> crate::DeserializationResult<Vec<Option<Self>>> {
7676
Self::from_arrow(data).map(|v| v.into_iter().map(Some).collect())
7777
}
78+
79+
/// Verifies that the given Arrow array can be deserialized into a collection of [`Self`]s.
80+
///
81+
/// Calls [`Self::from_arrow`] and returns an error if it fails.
82+
fn verify_arrow_array(data: &dyn arrow::array::Array) -> crate::DeserializationResult<()> {
83+
Self::from_arrow(data).map(|_| ())
84+
}
7885
}
7986

8087
/// A [`Component`] describes semantic data that can be used by any number of [`Archetype`]s.

crates/store/re_types_core/src/reflection.rs

+7
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ pub struct ComponentReflection {
240240

241241
/// Datatype of the component.
242242
pub datatype: arrow::datatypes::DataType,
243+
244+
/// Checks that the given Arrow array can be deserialized into a collection of [`Self`]s.
245+
pub verify_arrow_array: fn(&dyn arrow::array::Array) -> crate::DeserializationResult<()>,
243246
}
244247

245248
/// Runtime reflection about archetypes.
@@ -271,6 +274,10 @@ impl ArchetypeReflection {
271274
pub fn required_fields(&self) -> impl Iterator<Item = &ArchetypeFieldReflection> {
272275
self.fields.iter().filter(|field| field.is_required)
273276
}
277+
278+
pub fn get_field(&self, field_name: &str) -> Option<&ArchetypeFieldReflection> {
279+
self.fields.iter().find(|field| field.name == field_name)
280+
}
274281
}
275282

276283
/// Additional information about an archetype's field.

crates/top/rerun/src/commands/rrd/filter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl FilterCommand {
104104
Ok(size_bytes)
105105
});
106106

107-
for res in rx_decoder {
107+
for (_source, res) in rx_decoder {
108108
let mut is_success = true;
109109

110110
match res {

crates/top/rerun/src/commands/rrd/merge_compact.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ fn merge_and_compact(
163163

164164
let mut entity_dbs: std::collections::HashMap<StoreId, EntityDb> = Default::default();
165165

166-
for res in rx {
166+
for (_source, res) in rx {
167167
let mut is_success = true;
168168

169169
match res {

crates/top/rerun/src/commands/rrd/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ mod compare;
22
mod filter;
33
mod merge_compact;
44
mod print;
5+
mod verify;
56

67
use self::compare::CompareCommand;
78
use self::filter::FilterCommand;
89
use self::merge_compact::{CompactCommand, MergeCommand};
910
use self::print::PrintCommand;
11+
use self::verify::VerifyCommand;
1012

1113
// ---
1214

@@ -29,6 +31,11 @@ pub enum RrdCommands {
2931
/// Example: `rerun rrd print /my/recordings/*.rrd`
3032
Print(PrintCommand),
3133

34+
/// Verify the that the .rrd file can be loaded and correctly interpreted.
35+
///
36+
/// Can be used to ensure that the current Rerun version can load the data.
37+
Verify(VerifyCommand),
38+
3239
/// Compacts the contents of one or more .rrd/.rbl files/streams and writes the result standard output.
3340
///
3441
/// Reads from standard input if no paths are specified.
@@ -76,6 +83,7 @@ impl RrdCommands {
7683
.with_context(|| format!("current directory {:?}", std::env::current_dir()))
7784
}
7885
Self::Print(print_command) => print_command.run(),
86+
Self::Verify(verify_command) => verify_command.run(),
7987
Self::Compact(compact_command) => compact_command.run(),
8088
Self::Merge(merge_command) => merge_command.run(),
8189
Self::Filter(drop_command) => drop_command.run(),

crates/top/rerun/src/commands/rrd/print.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl PrintCommand {
4444
let version_policy = re_log_encoding::VersionPolicy::Warn;
4545
let (rx, _) = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);
4646

47-
for res in rx {
47+
for (_source, res) in rx {
4848
let mut is_success = true;
4949

5050
match res {
+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use std::collections::HashSet;
2+
3+
use arrow::array::AsArray as _;
4+
5+
use re_log_types::LogMsg;
6+
use re_types::reflection::Reflection;
7+
8+
use crate::commands::read_rrd_streams_from_file_or_stdin;
9+
10+
// ---
11+
12+
#[derive(Debug, Clone, clap::Parser)]
13+
pub struct VerifyCommand {
14+
/// Paths to read from. Reads from standard input if none are specified.
15+
path_to_input_rrds: Vec<String>,
16+
}
17+
18+
impl VerifyCommand {
19+
pub fn run(&self) -> anyhow::Result<()> {
20+
let mut verifier = Verifier::new()?;
21+
22+
let Self { path_to_input_rrds } = self;
23+
24+
// TODO(cmc): might want to make this configurable at some point.
25+
let version_policy = re_log_encoding::VersionPolicy::Warn;
26+
let (rx, _) = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);
27+
28+
let mut seen_files = std::collections::HashSet::new();
29+
30+
for (source, res) in rx {
31+
verifier.verify_log_msg(&source.to_string(), res?);
32+
seen_files.insert(source);
33+
}
34+
35+
if verifier.errors.is_empty() {
36+
if seen_files.len() == 1 {
37+
eprintln!("1 file verified without error.");
38+
} else {
39+
eprintln!("{} files verified without error.", seen_files.len());
40+
}
41+
Ok(())
42+
} else {
43+
for err in &verifier.errors {
44+
eprintln!("{err}");
45+
}
46+
Err(anyhow::anyhow!(
47+
"Verification failed with {} errors",
48+
verifier.errors.len()
49+
))
50+
}
51+
}
52+
}
53+
54+
struct Verifier {
55+
reflection: Reflection,
56+
errors: HashSet<String>,
57+
}
58+
59+
impl Verifier {
60+
fn new() -> anyhow::Result<Self> {
61+
Ok(Self {
62+
reflection: re_types::reflection::generate_reflection()?,
63+
errors: HashSet::new(),
64+
})
65+
}
66+
67+
fn verify_log_msg(&mut self, source: &str, msg: LogMsg) {
68+
match msg {
69+
LogMsg::SetStoreInfo { .. } | LogMsg::BlueprintActivationCommand { .. } => {}
70+
71+
LogMsg::ArrowMsg(_store_id, arrow_msg) => {
72+
self.verify_record_batch(source, &arrow_msg.batch);
73+
}
74+
}
75+
}
76+
77+
fn verify_record_batch(&mut self, source: &str, batch: &arrow::array::RecordBatch) {
78+
match re_sorbet::ChunkBatch::try_from(batch) {
79+
Ok(chunk_batch) => self.verify_chunk_batch(source, &chunk_batch),
80+
Err(err) => {
81+
self.errors
82+
.insert(format!("{source}: Failed to parse batch: {err}"));
83+
}
84+
}
85+
}
86+
87+
fn verify_chunk_batch(&mut self, source: &str, chunk_batch: &re_sorbet::ChunkBatch) {
88+
for (component_descriptor, column) in chunk_batch.component_columns() {
89+
if let Err(err) = self.verify_component_column(component_descriptor, column) {
90+
self.errors.insert(format!(
91+
"{source}: Failed to deserialize column {}: {}",
92+
component_descriptor.component_name,
93+
re_error::format(err)
94+
));
95+
}
96+
}
97+
}
98+
99+
fn verify_component_column(
100+
&self,
101+
component_descriptor: &re_sorbet::ComponentColumnDescriptor,
102+
column: &dyn arrow::array::Array,
103+
) -> anyhow::Result<()> {
104+
let re_sorbet::ComponentColumnDescriptor {
105+
component_name,
106+
archetype_name,
107+
archetype_field_name,
108+
..
109+
} = component_descriptor;
110+
111+
if !component_name.full_name().starts_with("rerun.") {
112+
re_log::debug_once!("Ignoring non-Rerun component {component_name:?}");
113+
return Ok(());
114+
}
115+
116+
if component_name.is_indicator_component() {
117+
// Lacks reflection and data
118+
} else {
119+
// Verify data
120+
let component_reflection = self
121+
.reflection
122+
.components
123+
.get(component_name)
124+
.ok_or_else(|| anyhow::anyhow!("Unknown component"))?;
125+
126+
let list_array = column.as_list_opt::<i32>().ok_or_else(|| {
127+
anyhow::anyhow!("Expected list array, found {:?}", column.data_type())
128+
})?;
129+
130+
assert_eq!(column.len() + 1, list_array.offsets().len());
131+
132+
for i in 0..column.len() {
133+
let cell = list_array.value(i);
134+
(component_reflection.verify_arrow_array)(cell.as_ref())?;
135+
}
136+
}
137+
138+
if let Some(archetype_name) = archetype_name {
139+
if archetype_name.full_name().starts_with("rerun.") {
140+
// Verify archetype.
141+
// We may want to have a flag to allow some of this?
142+
let archetype_reflection = self
143+
.reflection
144+
.archetypes
145+
.get(archetype_name)
146+
.ok_or_else(|| anyhow::anyhow!("Unknown archetype: {archetype_name:?}"))?;
147+
148+
if let Some(archetype_field_name) = archetype_field_name {
149+
// Verify archetype field.
150+
// We may want to have a flag to allow some of this?
151+
let archetype_field_reflection = archetype_reflection
152+
.get_field(archetype_field_name)
153+
.ok_or_else(|| {
154+
anyhow::anyhow!(
155+
"Input column referred to the archetype field name {archetype_field_name:?} of {archetype_name:?}, which only has the fields: {:?}",
156+
archetype_reflection.fields.iter().map(|field| field.name)
157+
)
158+
})?;
159+
160+
let expected_component_name = &archetype_field_reflection.component_name;
161+
if component_name != expected_component_name {
162+
return Err(anyhow::anyhow!(
163+
"Archetype field {archetype_field_name:?} of {archetype_name:?} has component {expected_component_name:?} in this version of Rerun, but the data column has component {component_name:?}"
164+
));
165+
}
166+
}
167+
} else {
168+
re_log::debug_once!("Ignoring non-Rerun archetype {archetype_name:?}");
169+
}
170+
}
171+
172+
Ok(())
173+
}
174+
}

crates/top/rerun/src/commands/stdio.rs

+23-6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,21 @@ use re_log_types::LogMsg;
99

1010
// ---
1111

12+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
13+
pub enum InputSource {
14+
Stdin,
15+
File(PathBuf),
16+
}
17+
18+
impl std::fmt::Display for InputSource {
19+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20+
match self {
21+
Self::Stdin => write!(f, "stdin"),
22+
Self::File(path) => write!(f, "{path:?}"),
23+
}
24+
}
25+
}
26+
1227
/// Asynchronously decodes potentially multiplexed RRD streams from the given `paths`, or standard
1328
/// input if none are specified.
1429
///
@@ -26,7 +41,7 @@ pub fn read_rrd_streams_from_file_or_stdin(
2641
version_policy: re_log_encoding::VersionPolicy,
2742
paths: &[String],
2843
) -> (
29-
channel::Receiver<anyhow::Result<LogMsg>>,
44+
channel::Receiver<(InputSource, anyhow::Result<LogMsg>)>,
3045
channel::Receiver<u64>,
3146
) {
3247
let path_to_input_rrds = paths.iter().map(PathBuf::from).collect_vec();
@@ -53,14 +68,14 @@ pub fn read_rrd_streams_from_file_or_stdin(
5368
{
5469
Ok(decoder) => decoder,
5570
Err(err) => {
56-
tx.send(Err(err)).ok();
71+
tx.send((InputSource::Stdin, Err(err))).ok();
5772
return;
5873
}
5974
};
6075

6176
for res in &mut decoder {
6277
let res = res.context("couldn't decode message from stdin -- skipping");
63-
tx.send(res).ok();
78+
tx.send((InputSource::Stdin, res)).ok();
6479
}
6580

6681
size_bytes += decoder.size_bytes();
@@ -73,7 +88,8 @@ pub fn read_rrd_streams_from_file_or_stdin(
7388
{
7489
Ok(file) => file,
7590
Err(err) => {
76-
tx.send(Err(err)).ok();
91+
tx.send((InputSource::File(rrd_path.clone()), Err(err)))
92+
.ok();
7793
continue;
7894
}
7995
};
@@ -84,7 +100,8 @@ pub fn read_rrd_streams_from_file_or_stdin(
84100
{
85101
Ok(decoder) => decoder,
86102
Err(err) => {
87-
tx.send(Err(err)).ok();
103+
tx.send((InputSource::File(rrd_path.clone()), Err(err)))
104+
.ok();
88105
continue;
89106
}
90107
};
@@ -93,7 +110,7 @@ pub fn read_rrd_streams_from_file_or_stdin(
93110
let res = res.context("decode rrd message").with_context(|| {
94111
format!("couldn't decode message {rrd_path:?} -- skipping")
95112
});
96-
tx.send(res).ok();
113+
tx.send((InputSource::File(rrd_path.clone()), res)).ok();
97114
}
98115

99116
size_bytes += decoder.size_bytes();

0 commit comments

Comments
 (0)