Skip to content

Commit 339b4c5

Browse files
authored
Merge branch 'main' into benliepert/update_ndarray
2 parents 90b32eb + ce83447 commit 339b4c5

File tree

4 files changed

+544
-49
lines changed

4 files changed

+544
-49
lines changed

crates/store/re_chunk/src/util.rs

+93-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
use arrow2::{
2-
array::{Array as ArrowArray, BooleanArray as ArrowBooleanArray, ListArray as ArrowListArray},
2+
array::{
3+
Array as ArrowArray, BooleanArray as ArrowBooleanArray,
4+
DictionaryArray as ArrowDictionaryArray, ListArray as ArrowListArray,
5+
PrimitiveArray as ArrowPrimitiveArray,
6+
},
37
bitmap::Bitmap as ArrowBitmap,
4-
datatypes::DataType as ArrowDataType,
8+
datatypes::DataType as ArrowDatatype,
59
offset::Offsets as ArrowOffsets,
610
};
7-
use itertools::Itertools as _;
11+
use itertools::Itertools;
812

913
// ---
1014

@@ -29,7 +33,7 @@ pub fn arrays_to_list_array_opt(arrays: &[Option<&dyn ArrowArray>]) -> Option<Ar
2933
///
3034
/// Returns an empty list if `arrays` is empty.
3135
pub fn arrays_to_list_array(
32-
array_datatype: ArrowDataType,
36+
array_datatype: ArrowDatatype,
3337
arrays: &[Option<&dyn ArrowArray>],
3438
) -> Option<ArrowListArray<i32>> {
3539
let arrays_dense = arrays.iter().flatten().copied().collect_vec();
@@ -66,6 +70,91 @@ pub fn arrays_to_list_array(
6670
))
6771
}
6872

73+
/// Create a sparse dictionary-array out of an array of (potentially) duplicated arrays.
74+
///
75+
/// The `Idx` is used as primary key to drive the deduplication process.
76+
/// Returns `None` if any of the specified `arrays` doesn't match the given `array_datatype`.
77+
///
78+
/// Returns an empty dictionary if `arrays` is empty.
79+
//
80+
// TODO(cmc): Ideally I would prefer to just use the array's underlying pointer as primary key, but
81+
// this has proved extremely brittle in practice. Maybe once we move to arrow-rs.
82+
// TODO(cmc): A possible improvement would be to pick the smallest key datatype possible based
83+
// on the cardinality of the input arrays.
84+
pub fn arrays_to_dictionary<Idx: Copy + Eq>(
85+
array_datatype: ArrowDatatype,
86+
arrays: &[Option<(Idx, &dyn ArrowArray)>],
87+
) -> Option<ArrowDictionaryArray<u32>> {
88+
// Dedupe the input arrays based on the given primary key.
89+
let arrays_dense_deduped = arrays
90+
.iter()
91+
.flatten()
92+
.copied()
93+
.dedup_by(|(lhs_index, _), (rhs_index, _)| lhs_index == rhs_index)
94+
.map(|(_index, array)| array)
95+
.collect_vec();
96+
97+
// Compute the keys for the final dictionary, using that same primary key.
98+
let keys = {
99+
let mut cur_key = 0u32;
100+
arrays
101+
.iter()
102+
.dedup_by_with_count(|lhs, rhs| {
103+
lhs.map(|(index, _)| index) == rhs.map(|(index, _)| index)
104+
})
105+
.flat_map(|(count, value)| {
106+
if value.is_some() {
107+
let keys = std::iter::repeat(Some(cur_key)).take(count);
108+
cur_key += 1;
109+
keys
110+
} else {
111+
std::iter::repeat(None).take(count)
112+
}
113+
})
114+
.collect_vec()
115+
};
116+
117+
// Concatenate the underlying data as usual, except only the _unique_ values!
118+
let data = if arrays_dense_deduped.is_empty() {
119+
arrow2::array::new_empty_array(array_datatype.clone())
120+
} else {
121+
arrow2::compute::concatenate::concatenate(&arrays_dense_deduped)
122+
.map_err(|err| {
123+
re_log::warn_once!("failed to concatenate arrays: {err}");
124+
err
125+
})
126+
.ok()?
127+
};
128+
129+
// We still need the underlying data to be a list-array, so the dictionary's keys can index
130+
// into this list-array.
131+
let data = {
132+
let datatype = ArrowListArray::<i32>::default_datatype(array_datatype);
133+
134+
#[allow(clippy::unwrap_used)] // yes, these are indeed lengths
135+
let offsets =
136+
ArrowOffsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len()))
137+
.unwrap();
138+
139+
ArrowListArray::<i32>::new(datatype, offsets.into(), data, None)
140+
};
141+
142+
let datatype = ArrowDatatype::Dictionary(
143+
arrow2::datatypes::IntegerType::UInt32,
144+
std::sync::Arc::new(data.data_type().clone()),
145+
true, // is_sorted
146+
);
147+
148+
// And finally we build our dictionary, which indexes into our concatenated list-array of
149+
// unique values.
150+
ArrowDictionaryArray::try_new(
151+
datatype,
152+
ArrowPrimitiveArray::<u32>::from(keys),
153+
data.to_boxed(),
154+
)
155+
.ok()
156+
}
157+
69158
/// Given a sparse `ArrowListArray` (i.e. an array with a validity bitmap that contains at least
70159
/// one falsy value), returns a dense `ArrowListArray` that only contains the non-null values from
71160
/// the original list.

crates/store/re_chunk_store/src/dataframe.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ impl ColumnDescriptor {
5555
}
5656

5757
#[inline]
58-
pub fn to_arrow_field(&self) -> ArrowField {
58+
pub fn to_arrow_field(&self, datatype: Option<ArrowDatatype>) -> ArrowField {
5959
match self {
6060
Self::Control(descr) => descr.to_arrow_field(),
6161
Self::Time(descr) => descr.to_arrow_field(),
62-
Self::Component(descr) => descr.to_arrow_field(),
62+
Self::Component(descr) => descr.to_arrow_field(datatype),
6363
}
6464
}
6565
}
@@ -268,7 +268,7 @@ impl ComponentColumnDescriptor {
268268
}
269269

270270
#[inline]
271-
pub fn to_arrow_field(&self) -> ArrowField {
271+
pub fn to_arrow_field(&self, wrapped_datatype: Option<ArrowDatatype>) -> ArrowField {
272272
let Self {
273273
entity_path,
274274
archetype_name,
@@ -278,14 +278,14 @@ impl ComponentColumnDescriptor {
278278
is_static,
279279
} = self;
280280

281+
// NOTE: Only the system doing the actual packing knows the final datatype with all of
282+
// its wrappers (is it a component array? is it a list? is it a dict?).
283+
let datatype = wrapped_datatype.unwrap_or_else(|| datatype.clone());
284+
281285
// TODO(cmc): figure out who's in charge of adding the outer list layer.
282286
ArrowField::new(
283287
component_name.short_name().to_owned(),
284-
ArrowDatatype::List(std::sync::Arc::new(ArrowField::new(
285-
"item",
286-
datatype.clone(),
287-
true, /* nullable */
288-
))),
288+
datatype,
289289
false, /* nullable */
290290
)
291291
// TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly.

crates/store/re_dataframe/src/latest_at.rs

+164-15
Original file line numberDiff line numberDiff line change
@@ -93,20 +93,10 @@ impl LatestAtQueryHandle<'_> {
9393
/// [`Self::schema`].
9494
/// Columns that do not yield any data will still be present in the results, filled with null values.
9595
pub fn get(&self) -> RecordBatch {
96-
re_tracing::profile_function!(format!("{:?}", self.query));
96+
re_tracing::profile_function!(format!("{}", self.query));
9797

9898
let columns = self.schema();
9999

100-
let schema = ArrowSchema {
101-
fields: columns
102-
.iter()
103-
.map(ColumnDescriptor::to_arrow_field)
104-
.collect(),
105-
106-
// TODO(#6889): properly some sorbet stuff we want to get in there at some point.
107-
metadata: Default::default(),
108-
};
109-
110100
let all_units: HashMap<&ComponentColumnDescriptor, UnitChunkShared> = {
111101
re_tracing::profile_scope!("queries");
112102

@@ -164,6 +154,10 @@ impl LatestAtQueryHandle<'_> {
164154
}
165155
}
166156

157+
// If the query didn't return anything at all, we just want a properly empty Recordbatch with
158+
// the right schema.
159+
let null_array_length = max_time_per_timeline.get(&self.query.timeline).is_some() as usize;
160+
167161
// NOTE: Keep in mind this must match the ordering specified by `Self::schema`.
168162
let packed_arrays = {
169163
re_tracing::profile_scope!("packing");
@@ -186,7 +180,12 @@ impl LatestAtQueryHandle<'_> {
186180
.and_then(|(_, chunk)| chunk.timelines().get(&descr.timeline).cloned());
187181

188182
Some(time_column.map_or_else(
189-
|| arrow2::array::new_null_array(descr.datatype.clone(), 1),
183+
|| {
184+
arrow2::array::new_null_array(
185+
descr.datatype.clone(),
186+
null_array_length,
187+
)
188+
},
190189
|time_column| time_column.times_array().to_boxed(),
191190
))
192191
}
@@ -196,16 +195,28 @@ impl LatestAtQueryHandle<'_> {
196195
.get(descr)
197196
.and_then(|chunk| chunk.components().get(&descr.component_name))
198197
.map_or_else(
199-
|| arrow2::array::new_null_array(descr.datatype.clone(), 1),
198+
|| {
199+
arrow2::array::new_null_array(
200+
descr.datatype.clone(),
201+
null_array_length,
202+
)
203+
},
200204
|list_array| list_array.to_boxed(),
201205
),
202206
),
203207
})
204-
.collect()
208+
.collect_vec()
205209
};
206210

207211
RecordBatch {
208-
schema,
212+
schema: ArrowSchema {
213+
fields: columns
214+
.iter()
215+
.zip(packed_arrays.iter())
216+
.map(|(descr, arr)| descr.to_arrow_field(Some(arr.data_type().clone())))
217+
.collect(),
218+
metadata: Default::default(),
219+
},
209220
data: ArrowChunk::new(packed_arrays),
210221
}
211222
}
@@ -225,3 +236,141 @@ impl<'a> LatestAtQueryHandle<'a> {
225236
})
226237
}
227238
}
239+
240+
// ---
241+
242+
#[cfg(test)]
243+
mod tests {
244+
use std::sync::Arc;
245+
246+
use re_chunk::{ArrowArray, Chunk, EntityPath, RowId, TimeInt, TimePoint, Timeline};
247+
use re_chunk_store::{
248+
ChunkStore, ChunkStoreConfig, ColumnDescriptor, ComponentColumnDescriptor,
249+
LatestAtQueryExpression, TimeColumnDescriptor,
250+
};
251+
use re_log_types::{example_components::MyPoint, StoreId, StoreKind};
252+
use re_query::Caches;
253+
use re_types::{
254+
components::{Color, Position3D, Radius},
255+
Loggable,
256+
};
257+
258+
use crate::QueryEngine;
259+
260+
#[test]
261+
fn empty_yields_empty() {
262+
let store = ChunkStore::new(
263+
StoreId::random(StoreKind::Recording),
264+
ChunkStoreConfig::default(),
265+
);
266+
let cache = Caches::new(&store);
267+
let engine = QueryEngine {
268+
store: &store,
269+
cache: &cache,
270+
};
271+
272+
let query = LatestAtQueryExpression {
273+
entity_path_expr: "/**".into(),
274+
timeline: Timeline::log_time(),
275+
at: TimeInt::MAX,
276+
};
277+
278+
let entity_path: EntityPath = "/points".into();
279+
let columns = vec![
280+
ColumnDescriptor::Time(TimeColumnDescriptor {
281+
timeline: Timeline::log_time(),
282+
datatype: Timeline::log_time().datatype(),
283+
}),
284+
ColumnDescriptor::Time(TimeColumnDescriptor {
285+
timeline: Timeline::log_tick(),
286+
datatype: Timeline::log_tick().datatype(),
287+
}),
288+
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Position3D>(
289+
entity_path.clone(),
290+
)),
291+
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Radius>(
292+
entity_path.clone(),
293+
)),
294+
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Color>(entity_path)),
295+
];
296+
297+
let handle = engine.latest_at(&query, Some(columns.clone()));
298+
let batch = handle.get();
299+
300+
// The output should be an empty recordbatch with the right schema and empty arrays.
301+
assert_eq!(0, batch.num_rows());
302+
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
303+
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
304+
assert!(itertools::izip!(columns.iter(), batch.data.iter())
305+
.all(|(descr, array)| descr.datatype() == array.data_type()));
306+
}
307+
308+
#[test]
309+
fn static_does_yield() {
310+
let mut store = ChunkStore::new(
311+
StoreId::random(StoreKind::Recording),
312+
ChunkStoreConfig::default(),
313+
);
314+
315+
let entity_path: EntityPath = "/points".into();
316+
let chunk = Arc::new(
317+
Chunk::builder(entity_path.clone())
318+
.with_component_batches(
319+
RowId::new(),
320+
TimePoint::default(),
321+
[&[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)] as _],
322+
)
323+
.build()
324+
.unwrap(),
325+
);
326+
_ = store.insert_chunk(&chunk);
327+
328+
eprintln!("{store}");
329+
330+
let cache = Caches::new(&store);
331+
let engine = QueryEngine {
332+
store: &store,
333+
cache: &cache,
334+
};
335+
336+
let query = LatestAtQueryExpression {
337+
entity_path_expr: "/**".into(),
338+
timeline: Timeline::log_time(),
339+
at: TimeInt::MAX,
340+
};
341+
342+
let columns = vec![
343+
ColumnDescriptor::Time(TimeColumnDescriptor {
344+
timeline: Timeline::log_time(),
345+
datatype: Timeline::log_time().datatype(),
346+
}),
347+
ColumnDescriptor::Time(TimeColumnDescriptor {
348+
timeline: Timeline::log_tick(),
349+
datatype: Timeline::log_tick().datatype(),
350+
}),
351+
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<MyPoint>(
352+
entity_path.clone(),
353+
)),
354+
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Radius>(
355+
entity_path.clone(),
356+
)),
357+
ColumnDescriptor::Component(ComponentColumnDescriptor::new::<Color>(entity_path)),
358+
];
359+
360+
let handle = engine.latest_at(&query, Some(columns.clone()));
361+
let batch = handle.get();
362+
363+
assert_eq!(1, batch.num_rows());
364+
assert_eq!(
365+
chunk.components().get(&MyPoint::name()).unwrap().to_boxed(),
366+
itertools::izip!(batch.schema.fields.iter(), batch.data.iter())
367+
.find_map(
368+
|(field, array)| (field.name == MyPoint::name().short_name())
369+
.then_some(array.clone())
370+
)
371+
.unwrap()
372+
);
373+
assert!(itertools::izip!(columns.iter(), batch.schema.fields.iter())
374+
.all(|(descr, field)| descr.to_arrow_field(None) == *field));
375+
}
376+
}

0 commit comments

Comments
 (0)