Skip to content

Commit 166068c

Browse files
authored
Fix Python bridge losing descriptor/tags information during transit (#8786)
Not sure how it got that way, but that fixes it and generally simplifies the pipeline, in any case.
1 parent c105e19 commit 166068c

File tree

2 files changed

+42
-61
lines changed

2 files changed

+42
-61
lines changed

rerun_py/src/arrow.rs

+41-54
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use pyo3::{
1818
};
1919

2020
use re_arrow_util::ArrowArrayDowncastRef as _;
21-
use re_chunk::{Chunk, ChunkError, ChunkId, PendingRow, RowId, TimeColumn, TransportChunk};
21+
use re_chunk::{Chunk, ChunkError, ChunkId, PendingRow, RowId, TimeColumn, TimelineName};
2222
use re_log_types::TimePoint;
2323
use re_sdk::{external::nohash_hasher::IntMap, ComponentDescriptor, EntityPath, Timeline};
2424

@@ -54,23 +54,9 @@ pub fn descriptor_to_rust(component_descr: &Bound<'_, PyAny>) -> PyResult<Compon
5454
/// Perform conversion between a pyarrow array to arrow types.
5555
///
5656
/// `name` is the name of the Rerun component, and the name of the pyarrow `Field` (column name).
57-
pub fn array_to_rust(
58-
arrow_array: &Bound<'_, PyAny>,
59-
component_descr: &ComponentDescriptor,
60-
) -> PyResult<(ArrowArrayRef, ArrowField)> {
57+
pub fn array_to_rust(arrow_array: &Bound<'_, PyAny>) -> PyResult<ArrowArrayRef> {
6158
let py_array: PyArrowType<ArrowArrayData> = arrow_array.extract()?;
62-
let array = make_array(py_array.0);
63-
64-
let datatype = array.data_type();
65-
let metadata = TransportChunk::field_metadata_component_descriptor(component_descr);
66-
let field = ArrowField::new(
67-
component_descr.component_name.to_string(),
68-
datatype.clone(),
69-
true,
70-
)
71-
.with_metadata(metadata);
72-
73-
Ok((array, field))
59+
Ok(make_array(py_array.0))
7460
}
7561

7662
/// Build a [`PendingRow`] given a '**kwargs'-style dictionary of component arrays.
@@ -85,8 +71,7 @@ pub fn build_row_from_components(
8571
let mut components = IntMap::default();
8672
for (component_descr, array) in components_per_descr {
8773
let component_descr = descriptor_to_rust(&component_descr)?;
88-
let (list_array, _field) = array_to_rust(&array, &component_descr)?;
89-
74+
let list_array = array_to_rust(&array)?;
9075
components.insert(component_descr, list_array);
9176
}
9277

@@ -107,36 +92,36 @@ pub fn build_chunk_from_components(
10792
let chunk_id = ChunkId::new();
10893

10994
// Extract the timeline data
110-
let (arrays, fields): (Vec<ArrowArrayRef>, Vec<ArrowField>) = itertools::process_results(
111-
timelines.iter().map(|(name, array)| {
112-
let py_name = name.downcast::<PyString>()?;
113-
let name: std::borrow::Cow<'_, str> = py_name.extract()?;
114-
array_to_rust(&array, &ComponentDescriptor::new(name.to_string()))
115-
}),
116-
|iter| iter.unzip(),
117-
)?;
95+
let (arrays, timeline_names): (Vec<ArrowArrayRef>, Vec<TimelineName>) =
96+
itertools::process_results(
97+
timelines.iter().map(|(name, array)| {
98+
let py_name = name.downcast::<PyString>()?;
99+
let name: std::borrow::Cow<'_, str> = py_name.extract()?;
100+
let timeline_name: TimelineName = name.as_ref().into();
101+
array_to_rust(&array).map(|array| (array, timeline_name))
102+
}),
103+
|iter| iter.unzip(),
104+
)?;
118105

119106
let timelines: Result<Vec<_>, ChunkError> = arrays
120107
.into_iter()
121-
.zip(fields)
122-
.map(|(array, field)| {
123-
let timeline_data =
124-
TimeColumn::read_array(&ArrowArrayRef::from(array)).map_err(|err| {
125-
ChunkError::Malformed {
126-
reason: format!("Invalid timeline {}: {err}", field.name()),
127-
}
128-
})?;
129-
let timeline = match field.data_type() {
130-
arrow::datatypes::DataType::Int64 => {
131-
Ok(Timeline::new_sequence(field.name().clone()))
132-
}
108+
.zip(timeline_names)
109+
.map(|(array, timeline_name)| {
110+
let timeline = match array.data_type() {
111+
arrow::datatypes::DataType::Int64 => Ok(Timeline::new_sequence(timeline_name)),
133112
arrow::datatypes::DataType::Timestamp(_, _) => {
134-
Ok(Timeline::new_temporal(field.name().clone()))
113+
Ok(Timeline::new_temporal(timeline_name))
135114
}
136115
_ => Err(ChunkError::Malformed {
137-
reason: format!("Invalid data_type for timeline: {}", field.name()),
116+
reason: format!("Invalid data_type for timeline: {timeline_name}"),
138117
}),
139118
}?;
119+
let timeline_data =
120+
TimeColumn::read_array(&ArrowArrayRef::from(array)).map_err(|err| {
121+
ChunkError::Malformed {
122+
reason: format!("Invalid timeline {timeline_name}: {err}"),
123+
}
124+
})?;
140125
Ok((timeline, timeline_data))
141126
})
142127
.collect();
@@ -148,31 +133,33 @@ pub fn build_chunk_from_components(
148133
.collect();
149134

150135
// Extract the component data
151-
let (arrays, fields): (Vec<ArrowArrayRef>, Vec<ArrowField>) = itertools::process_results(
152-
components_per_descr.iter().map(|(component_descr, array)| {
153-
array_to_rust(&array, &descriptor_to_rust(&component_descr)?)
154-
}),
155-
|iter| iter.unzip(),
156-
)?;
136+
let (arrays, component_descrs): (Vec<ArrowArrayRef>, Vec<ComponentDescriptor>) =
137+
itertools::process_results(
138+
components_per_descr.iter().map(|(component_descr, array)| {
139+
let component_descr = descriptor_to_rust(&component_descr)?;
140+
array_to_rust(&array).map(|array| (array, component_descr))
141+
}),
142+
|iter| iter.unzip(),
143+
)?;
157144

158145
let components: Result<Vec<(ComponentDescriptor, _)>, ChunkError> = arrays
159146
.into_iter()
160-
.zip(fields)
161-
.map(|(value, field)| {
162-
let batch = if let Some(batch) = value.downcast_array_ref::<ArrowListArray>() {
147+
.zip(component_descrs)
148+
.map(|(list_array, descr)| {
149+
let batch = if let Some(batch) = list_array.downcast_array_ref::<ArrowListArray>() {
163150
batch.clone()
164151
} else {
165152
let offsets =
166-
ArrowOffsetBuffer::from_lengths(std::iter::repeat(1).take(value.len()));
167-
let field = ArrowField::new("item", value.data_type().clone(), true).into();
168-
ArrowListArray::try_new(field, offsets, value, None).map_err(|err| {
153+
ArrowOffsetBuffer::from_lengths(std::iter::repeat(1).take(list_array.len()));
154+
let field = ArrowField::new("item", list_array.data_type().clone(), true).into();
155+
ArrowListArray::try_new(field, offsets, list_array, None).map_err(|err| {
169156
ChunkError::Malformed {
170157
reason: format!("Failed to wrap in List array: {err}"),
171158
}
172159
})?
173160
};
174161

175-
Ok((ComponentDescriptor::new(field.name().clone()), batch))
162+
Ok((descr, batch))
176163
})
177164
.collect();
178165

rerun_py/src/video.rs

+1-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use pyo3::{exceptions::PyRuntimeError, pyfunction, Bound, PyAny, PyResult};
44

55
use re_arrow_util::ArrowArrayDowncastRef as _;
6-
use re_sdk::ComponentDescriptor;
76
use re_video::VideoLoadError;
87

98
use crate::arrow::array_to_rust;
@@ -20,12 +19,7 @@ pub fn asset_video_read_frame_timestamps_ns(
2019
video_bytes_arrow_array: &Bound<'_, PyAny>,
2120
media_type: Option<&str>,
2221
) -> PyResult<Vec<i64>> {
23-
let component_descr = ComponentDescriptor {
24-
archetype_name: Some("rerun.archetypes.AssetVideo".into()),
25-
archetype_field_name: Some("blob".into()),
26-
component_name: "rerun.components.Blob".into(),
27-
};
28-
let video_bytes_arrow_array = array_to_rust(video_bytes_arrow_array, &component_descr)?.0;
22+
let video_bytes_arrow_array = array_to_rust(video_bytes_arrow_array)?;
2923

3024
let video_bytes_arrow_uint8_array = video_bytes_arrow_array
3125
.downcast_array_ref::<arrow::array::ListArray>()

0 commit comments

Comments
 (0)