Skip to content

Commit a29741e

Browse files
orlpjsjasonseba
authored andcommitted
perf: Improve join performance for new-streaming engine (pola-rs#21620)
1 parent e38255d commit a29741e

File tree

14 files changed

+973
-429
lines changed

14 files changed

+973
-429
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/polars-arrow/src/array/primitive/builder.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,12 @@ impl<T: NativeType> StaticArrayBuilder for PrimitiveArrayBuilder<T> {
7676
idxs: &[IdxSize],
7777
_share: ShareStrategy,
7878
) {
79-
self.values.reserve(idxs.len());
80-
for idx in idxs {
81-
self.values
82-
.push_unchecked(other.value_unchecked(*idx as usize));
83-
}
79+
// TODO: SIMD gather kernels?
80+
let other_values_slice = other.values().as_slice();
81+
self.values.extend(
82+
idxs.iter()
83+
.map(|idx| *other_values_slice.get_unchecked(*idx as usize)),
84+
);
8485
self.validity
8586
.gather_extend_from_opt_validity(other.validity(), idxs);
8687
}

crates/polars-arrow/src/bitmap/builder.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,18 @@ impl BitmapBuilder {
251251
self.extend_from_slice(slice, bm_offset + start, length);
252252
}
253253

254+
pub fn subslice_extend_from_opt_validity(
255+
&mut self,
256+
bitmap: Option<&Bitmap>,
257+
start: usize,
258+
length: usize,
259+
) {
260+
match bitmap {
261+
Some(bm) => self.subslice_extend_from_bitmap(bm, start, length),
262+
None => self.extend_constant(length, true),
263+
}
264+
}
265+
254266
/// # Safety
255267
/// The indices must be in-bounds.
256268
pub unsafe fn gather_extend_from_slice(
@@ -308,6 +320,43 @@ impl BitmapBuilder {
308320
self.opt_gather_extend_from_slice(slice, offset, length, idxs);
309321
}
310322

323+
/// # Safety
324+
/// The indices must be in-bounds.
325+
pub unsafe fn gather_extend_from_opt_validity(
326+
&mut self,
327+
bitmap: Option<&Bitmap>,
328+
idxs: &[IdxSize],
329+
length: usize,
330+
) {
331+
if let Some(bm) = bitmap {
332+
let (slice, offset, sl_length) = bm.as_slice();
333+
debug_assert_eq!(sl_length, length);
334+
self.gather_extend_from_slice(slice, offset, length, idxs);
335+
} else {
336+
self.extend_constant(length, true);
337+
}
338+
}
339+
340+
pub fn opt_gather_extend_from_opt_validity(
341+
&mut self,
342+
bitmap: Option<&Bitmap>,
343+
idxs: &[IdxSize],
344+
length: usize,
345+
) {
346+
if let Some(bm) = bitmap {
347+
let (slice, offset, sl_length) = bm.as_slice();
348+
debug_assert_eq!(sl_length, length);
349+
self.opt_gather_extend_from_slice(slice, offset, sl_length, idxs);
350+
} else {
351+
unsafe {
352+
self.reserve(idxs.len());
353+
for idx in idxs {
354+
self.push_unchecked((*idx as usize) < length);
355+
}
356+
}
357+
}
358+
}
359+
311360
/// # Safety
312361
/// May only be called once at the end.
313362
unsafe fn finish(&mut self) {

crates/polars-core/src/chunked_array/object/builder.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
use arrow::array::builder::{ArrayBuilder, ShareStrategy};
12
use arrow::bitmap::BitmapBuilder;
3+
use polars_utils::vec::PushUnchecked;
24

35
use super::*;
46
use crate::utils::get_iter_capacity;
@@ -176,3 +178,96 @@ pub(crate) fn object_series_to_arrow_array(s: &Series) -> ArrayRef {
176178
let arr = arr.as_any().downcast_ref::<ListArray<i64>>().unwrap();
177179
arr.values().to_boxed()
178180
}
181+
182+
impl<T: PolarsObject> ArrayBuilder for ObjectChunkedBuilder<T> {
183+
fn dtype(&self) -> &ArrowDataType {
184+
&ArrowDataType::FixedSizeBinary(size_of::<T>())
185+
}
186+
187+
fn reserve(&mut self, additional: usize) {
188+
self.bitmask_builder.reserve(additional);
189+
self.values.reserve(additional);
190+
}
191+
192+
fn freeze(self) -> Box<dyn Array> {
193+
Box::new(ObjectArray {
194+
values: self.values.into(),
195+
validity: self.bitmask_builder.into_opt_validity(),
196+
})
197+
}
198+
199+
fn freeze_reset(&mut self) -> Box<dyn Array> {
200+
Box::new(ObjectArray {
201+
values: core::mem::take(&mut self.values).into(),
202+
validity: core::mem::take(&mut self.bitmask_builder).into_opt_validity(),
203+
})
204+
}
205+
206+
fn len(&self) -> usize {
207+
self.values.len()
208+
}
209+
210+
fn extend_nulls(&mut self, length: usize) {
211+
self.values.resize(self.values.len() + length, T::default());
212+
self.bitmask_builder.extend_constant(length, false);
213+
}
214+
215+
fn subslice_extend(
216+
&mut self,
217+
other: &dyn Array,
218+
start: usize,
219+
length: usize,
220+
_share: ShareStrategy,
221+
) {
222+
let other: &ObjectArray<T> = other.as_any().downcast_ref().unwrap();
223+
self.values
224+
.extend_from_slice(&other.values[start..start + length]);
225+
self.bitmask_builder
226+
.subslice_extend_from_opt_validity(other.validity(), start, length);
227+
}
228+
229+
fn subslice_extend_repeated(
230+
&mut self,
231+
other: &dyn Array,
232+
start: usize,
233+
length: usize,
234+
repeats: usize,
235+
share: ShareStrategy,
236+
) {
237+
for _ in 0..repeats {
238+
self.subslice_extend(other, start, length, share)
239+
}
240+
}
241+
242+
unsafe fn gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], _share: ShareStrategy) {
243+
let other: &ObjectArray<T> = other.as_any().downcast_ref().unwrap();
244+
let other_values_slice = other.values.as_slice();
245+
self.values.extend(
246+
idxs.iter()
247+
.map(|idx| other_values_slice.get_unchecked(*idx as usize).clone()),
248+
);
249+
self.bitmask_builder
250+
.gather_extend_from_opt_validity(other.validity(), idxs, other.len());
251+
}
252+
253+
fn opt_gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], _share: ShareStrategy) {
254+
let other: &ObjectArray<T> = other.as_any().downcast_ref().unwrap();
255+
let other_values_slice = other.values.as_slice();
256+
self.values.reserve(idxs.len());
257+
unsafe {
258+
for idx in idxs {
259+
let val = if (*idx as usize) < other.len() {
260+
other_values_slice.get_unchecked(*idx as usize).clone()
261+
} else {
262+
T::default()
263+
};
264+
self.values.push_unchecked(val);
265+
}
266+
}
267+
self.bitmask_builder.opt_gather_extend_from_opt_validity(
268+
other.validity(),
269+
idxs,
270+
other.len(),
271+
);
272+
}
273+
}

crates/polars-core/src/chunked_array/object/registry.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::fmt::{Debug, Formatter};
77
use std::ops::Deref;
88
use std::sync::{Arc, LazyLock, RwLock};
99

10+
use arrow::array::builder::ArrayBuilder;
1011
use arrow::array::ArrayRef;
1112
use arrow::datatypes::ArrowDataType;
1213
use polars_utils::pl_str::PlSmallStr;
@@ -40,7 +41,9 @@ static GLOBAL_OBJECT_REGISTRY: LazyLock<RwLock<Option<ObjectRegistry>>> =
4041

4142
/// This trait can be registered, after which that global registration
4243
/// can be used to materialize object types
43-
pub trait AnonymousObjectBuilder {
44+
pub trait AnonymousObjectBuilder: ArrayBuilder {
45+
fn as_array_builder(self: Box<Self>) -> Box<dyn ArrayBuilder>;
46+
4447
/// # Safety
4548
/// Expect `ObjectArray<T>` arrays.
4649
unsafe fn from_chunks(self: Box<Self>, chunks: Vec<ArrayRef>) -> Series;
@@ -73,12 +76,17 @@ pub trait AnonymousObjectBuilder {
7376
}
7477

7578
impl<T: PolarsObject> AnonymousObjectBuilder for ObjectChunkedBuilder<T> {
76-
// Expect ObjectArray<T> arrays.
79+
/// # Safety
80+
/// Expects `ObjectArray<T>` arrays.
7781
unsafe fn from_chunks(self: Box<Self>, chunks: Vec<ArrayRef>) -> Series {
7882
ObjectChunked::<T>::new_with_compute_len(Arc::new(self.field().clone()), chunks)
7983
.into_series()
8084
}
8185

86+
fn as_array_builder(self: Box<Self>) -> Box<dyn ArrayBuilder> {
87+
self
88+
}
89+
8290
fn append_null(&mut self) {
8391
self.append_null()
8492
}

crates/polars-core/src/series/builder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use arrow::array::builder::{make_builder, ArrayBuilder, ShareStrategy};
22
use polars_utils::IdxSize;
33

4+
#[cfg(feature = "object")]
5+
use crate::chunked_array::object::registry::get_object_builder;
46
use crate::prelude::*;
57
use crate::utils::Container;
68

@@ -12,6 +14,13 @@ pub struct SeriesBuilder {
1214

1315
impl SeriesBuilder {
1416
pub fn new(dtype: DataType) -> Self {
17+
// FIXME: get rid of this hack.
18+
#[cfg(feature = "object")]
19+
if matches!(dtype, DataType::Object(_)) {
20+
let builder = get_object_builder(PlSmallStr::EMPTY, 0).as_array_builder();
21+
return Self { dtype, builder };
22+
}
23+
1524
let builder = make_builder(&dtype.to_physical().to_arrow(CompatLevel::newest()));
1625
Self { dtype, builder }
1726
}

crates/polars-expr/src/hash_keys.rs

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,25 @@ impl HashKeys {
7474
self.len() == 0
7575
}
7676

77-
/// After this call partition_idxs[p] will contain the indices of hashes
78-
/// that belong to partition p, and the cardinality sketches are updated
79-
/// accordingly.
80-
pub fn gen_partition_idxs(
77+
/// After this call partitions will be extended with the partition for each
78+
/// hash. Nulls are assigned IdxSize::MAX or a specific partition depending
79+
/// on whether partition_nulls is true.
80+
pub fn gen_partitions(
81+
&self,
82+
partitioner: &HashPartitioner,
83+
partitions: &mut Vec<IdxSize>,
84+
partition_nulls: bool,
85+
) {
86+
match self {
87+
Self::RowEncoded(s) => s.gen_partitions(partitioner, partitions, partition_nulls),
88+
Self::Single(s) => s.gen_partitions(partitioner, partitions, partition_nulls),
89+
}
90+
}
91+
92+
/// After this call partition_idxs[p] will be extended with the indices of
93+
/// hashes that belong to partition p, and the cardinality sketches are
94+
/// updated accordingly.
95+
pub fn gen_idxs_per_partition(
8196
&self,
8297
partitioner: &HashPartitioner,
8398
partition_idxs: &mut [Vec<IdxSize>],
@@ -86,13 +101,13 @@ impl HashKeys {
86101
) {
87102
if sketches.is_empty() {
88103
match self {
89-
Self::RowEncoded(s) => s.gen_partition_idxs::<false>(
104+
Self::RowEncoded(s) => s.gen_idxs_per_partition::<false>(
90105
partitioner,
91106
partition_idxs,
92107
sketches,
93108
partition_nulls,
94109
),
95-
Self::Single(s) => s.gen_partition_idxs::<false>(
110+
Self::Single(s) => s.gen_idxs_per_partition::<false>(
96111
partitioner,
97112
partition_idxs,
98113
sketches,
@@ -101,13 +116,13 @@ impl HashKeys {
101116
}
102117
} else {
103118
match self {
104-
Self::RowEncoded(s) => s.gen_partition_idxs::<true>(
119+
Self::RowEncoded(s) => s.gen_idxs_per_partition::<true>(
105120
partitioner,
106121
partition_idxs,
107122
sketches,
108123
partition_nulls,
109124
),
110-
Self::Single(s) => s.gen_partition_idxs::<true>(
125+
Self::Single(s) => s.gen_idxs_per_partition::<true>(
111126
partitioner,
112127
partition_idxs,
113128
sketches,
@@ -159,7 +174,33 @@ pub struct RowEncodedKeys {
159174
}
160175

161176
impl RowEncodedKeys {
162-
pub fn gen_partition_idxs<const BUILD_SKETCHES: bool>(
177+
pub fn gen_partitions(
178+
&self,
179+
partitioner: &HashPartitioner,
180+
partitions: &mut Vec<IdxSize>,
181+
partition_nulls: bool,
182+
) {
183+
partitions.reserve(self.hashes.len());
184+
if let Some(validity) = self.keys.validity() {
185+
// Arbitrarily put nulls in partition 0.
186+
let null_p = if partition_nulls { 0 } else { IdxSize::MAX };
187+
partitions.extend(self.hashes.values_iter().zip(validity).map(|(h, is_v)| {
188+
if is_v {
189+
partitioner.hash_to_partition(*h) as IdxSize
190+
} else {
191+
null_p
192+
}
193+
}))
194+
} else {
195+
partitions.extend(
196+
self.hashes
197+
.values_iter()
198+
.map(|h| partitioner.hash_to_partition(*h) as IdxSize),
199+
)
200+
}
201+
}
202+
203+
pub fn gen_idxs_per_partition<const BUILD_SKETCHES: bool>(
163204
&self,
164205
partitioner: &HashPartitioner,
165206
partition_idxs: &mut [Vec<IdxSize>],
@@ -168,9 +209,6 @@ impl RowEncodedKeys {
168209
) {
169210
assert!(partition_idxs.len() == partitioner.num_partitions());
170211
assert!(!BUILD_SKETCHES || sketches.len() == partitioner.num_partitions());
171-
for p in partition_idxs.iter_mut() {
172-
p.clear();
173-
}
174212

175213
if let Some(validity) = self.keys.validity() {
176214
for (i, (h, is_v)) in self.hashes.values_iter().zip(validity).enumerate() {
@@ -264,17 +302,24 @@ pub struct SingleKeys {
264302
}
265303

266304
impl SingleKeys {
267-
pub fn gen_partition_idxs<const BUILD_SKETCHES: bool>(
305+
#[allow(clippy::ptr_arg)] // Remove when implemented.
306+
pub fn gen_partitions(
307+
&self,
308+
_partitioner: &HashPartitioner,
309+
_partitions: &mut Vec<IdxSize>,
310+
_partition_nulls: bool,
311+
) {
312+
todo!()
313+
}
314+
315+
pub fn gen_idxs_per_partition<const BUILD_SKETCHES: bool>(
268316
&self,
269317
partitioner: &HashPartitioner,
270318
partition_idxs: &mut [Vec<IdxSize>],
271319
_sketches: &mut [CardinalitySketch],
272320
_partition_nulls: bool,
273321
) {
274322
assert!(partitioner.num_partitions() == partition_idxs.len());
275-
for p in partition_idxs.iter_mut() {
276-
p.clear();
277-
}
278323

279324
todo!()
280325
}

0 commit comments

Comments
 (0)