Skip to content

Add benchmark for parquet reader with row_filter and project settings #7401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
822760c
Add benchmark for parquet reader with row_filter and project settings
zhuqi-lucas Apr 10, 2025
31a544f
fix clippy
zhuqi-lucas Apr 10, 2025
b16428d
change bench mark to use asyn read to trigger the page cache
zhuqi-lucas Apr 11, 2025
1aacd01
fix
zhuqi-lucas Apr 11, 2025
2d58006
Merge remote-tracking branch 'upstream/main' into benchmark_row_filter
zhuqi-lucas Apr 11, 2025
768826e
fix
zhuqi-lucas Apr 11, 2025
f624b91
Update comments, add background
alamb Apr 11, 2025
6c28e44
incremently addressing the comments
zhuqi-lucas Apr 11, 2025
69a2617
Fix bool random
zhuqi-lucas Apr 11, 2025
b044813
Merge commit '69a2617' into alamb/docs_for_bench
alamb Apr 11, 2025
6a37818
fixup
alamb Apr 11, 2025
2f6ccbb
Add fn switch and project enum
zhuqi-lucas Apr 11, 2025
994c747
Merge pull request #1 from alamb/alamb/docs_for_bench
zhuqi-lucas Apr 11, 2025
d0a656b
Fix clippy
zhuqi-lucas Apr 11, 2025
67480b9
Address comment
zhuqi-lucas Apr 12, 2025
16bc1bf
Add float(half set) and int(full set) change
zhuqi-lucas Apr 12, 2025
7638c41
Address comments
zhuqi-lucas Apr 13, 2025
9271cc9
Set compression
zhuqi-lucas Apr 14, 2025
8e00ac5
fix
zhuqi-lucas Apr 14, 2025
890519e
Update comments
alamb Apr 14, 2025
7eb0476
refactor filter column indexes
alamb Apr 14, 2025
22c7b39
Read from in memory buffer
alamb Apr 14, 2025
86878ab
Merge remote-tracking branch 'apache/main' into benchmark_row_filter
alamb Apr 14, 2025
5ae9b58
celanu
alamb Apr 14, 2025
1effe88
Test both sync and async readers
alamb Apr 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
criterion = { version = "0.5", default-features = false }
criterion = { version = "0.5", default-features = false, features = ["async_futures"] }
snap = { version = "1.0", default-features = false }
tempfile = { version = "3.0", default-features = false }
brotli = { version = "7.0", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -211,6 +211,10 @@ name = "arrow_statistics"
required-features = ["arrow"]
harness = false

[[bench]]
name = "arrow_reader_row_filter"
required-features = ["arrow", "async"]
harness = false

[[bench]]
name = "compression"
Expand Down
325 changes: 325 additions & 0 deletions parquet/benches/arrow_reader_row_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmark for evaluating row filters and projections on a Parquet file.
//!
//! This benchmark creates a Parquet file in memory with 100K rows and four columns:
//! - int64: sequential integers
//! - float64: floating-point values (derived from the integers)
//! - utf8View: string values where about half are non-empty,
//! and a few rows (every 10Kth row) are the constant "const"
//! - ts: timestamp values (using, e.g., a millisecond epoch)
//!
//! It then applies several filter functions and projections, benchmarking the read-back speed.
//!
//! Filters tested:
//! - A string filter: `utf8View <> ''` (non-empty)
//! - A string filter: `utf8View = 'const'` (selective)
//! - An integer non-selective filter (e.g. even numbers)
//! - An integer selective filter (e.g. `int64 = 0`)
//! - A timestamp filter (e.g. `ts > threshold`)
//!
//! Projections tested:
//! - All 4 columns.
//! - All columns except the one used for the filter.
//!
//! To run the benchmark, use `cargo bench --bench bench_filter_projection`.

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use std::sync::Arc;
use tempfile::NamedTempFile;

use arrow::array::{
ArrayRef, BooleanArray, BooleanBuilder, Float64Array, Int64Array, TimestampMillisecondArray,
};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow_array::builder::StringViewBuilder;
use arrow_array::{Array, StringViewArray};
use criterion::async_executor::FuturesExecutor;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::properties::WriterProperties;
use tokio::fs::File;
use tokio::runtime::Runtime;

/// Create a RecordBatch with 100K rows and four columns.
fn make_record_batch() -> RecordBatch {
let num_rows = 100_000;

// int64 column: sequential numbers 0..num_rows
let int_values: Vec<i64> = (0..num_rows as i64).collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more common to use fixed seeded random values to create values to avoid artifacts that such regular patterns may introduce

There are some good examples here: https://github.com/apache/arrow-rs/blob/d0260fcffa07a4cb8650cc290ab29027a3a8e65c/parquet/benches/arrow_writer.rs#L101-L100

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I suggest:

  1. Random ints
  2. Random floats
  3. ordered timestamps (as you have them, as that is quite common)
  4. Random strings

For string view, it is also important to include strings that are more than 12 bytes long (strings less than this are entirely inlined into the view)

I printed out the data it is quite regular:

     Running benches/arrow_reader_row_filter.rs (target/debug/deps/arrow_reader_row_filter-6404dc89531cf7fd)
Gnuplot not found, using plotters backend
Batch created with 100000 rows
First 100 rows
+-------+---------------------+----------+-------------------------+
| int64 | float64             | utf8View | ts                      |
+-------+---------------------+----------+-------------------------+
| 0     | 0.0                 | const    | 1970-01-01T00:00:00     |
| 1     | 0.1                 |          | 1970-01-01T00:00:00.001 |
| 2     | 0.2                 | nonempty | 1970-01-01T00:00:00.002 |
| 3     | 0.30000000000000004 |          | 1970-01-01T00:00:00.003 |
| 4     | 0.4                 | nonempty | 1970-01-01T00:00:00.004 |
| 5     | 0.5                 |          | 1970-01-01T00:00:00.005 |
| 6     | 0.6000000000000001  | nonempty | 1970-01-01T00:00:00.006 |
| 7     | 0.7000000000000001  |          | 1970-01-01T00:00:00.007 |
| 8     | 0.8                 | nonempty | 1970-01-01T00:00:00.008 |
| 9     | 0.9                 |          | 1970-01-01T00:00:00.009 |
| 10    | 1.0                 | nonempty | 1970-01-01T00:00:00.010 |
| 11    | 1.1                 |          | 1970-01-01T00:00:00.011 |
| 12    | 1.2000000000000002  | nonempty | 1970-01-01T00:00:00.012 |
| 13    | 1.3                 |          | 1970-01-01T00:00:00.013 |
| 14    | 1.4000000000000001  | nonempty | 1970-01-01T00:00:00.014 |
| 15    | 1.5                 |          | 1970-01-01T00:00:00.015 |
| 16    | 1.6                 | nonempty | 1970-01-01T00:00:00.016 |
| 17    | 1.7000000000000002  |          | 1970-01-01T00:00:00.017 |
| 18    | 1.8                 | nonempty | 1970-01-01T00:00:00.018 |
| 19    | 1.9000000000000001  |          | 1970-01-01T00:00:00.019 |
| 20    | 2.0                 | nonempty | 1970-01-01T00:00:00.020 |
| 21    | 2.1                 |          | 1970-01-01T00:00:00.021 |
| 22    | 2.2                 | nonempty | 1970-01-01T00:00:00.022 |
| 23    | 2.3000000000000003  |          | 1970-01-01T00:00:00.023 |
| 24    | 2.4000000000000004  | nonempty | 1970-01-01T00:00:00.024 |
| 25    | 2.5                 |          | 1970-01-01T00:00:00.025 |
| 26    | 2.6                 | nonempty | 1970-01-01T00:00:00.026 |
| 27    | 2.7                 |          | 1970-01-01T00:00:00.027 |
| 28    | 2.8000000000000003  | nonempty | 1970-01-01T00:00:00.028 |
| 29    | 2.9000000000000004  |          | 1970-01-01T00:00:00.029 |
| 30    | 3.0                 | nonempty | 1970-01-01T00:00:00.030 |
| 31    | 3.1                 |          | 1970-01-01T00:00:00.031 |
| 32    | 3.2                 | nonempty | 1970-01-01T00:00:00.032 |
| 33    | 3.3000000000000003  |          | 1970-01-01T00:00:00.033 |
| 34    | 3.4000000000000004  | nonempty | 1970-01-01T00:00:00.034 |
| 35    | 3.5                 |          | 1970-01-01T00:00:00.035 |

I printed this using

diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs
index af07636e49..87e353f2c0 100644
--- a/parquet/benches/arrow_reader_row_filter.rs
+++ b/parquet/benches/arrow_reader_row_filter.rs
@@ -57,6 +57,7 @@ use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMas
 use parquet::file::properties::WriterProperties;
 use tokio::fs::File;
 use tokio::runtime::Runtime;
+use arrow_cast::pretty::pretty_format_batches;

 /// Create a RecordBatch with 100K rows and four columns.
 fn make_record_batch() -> RecordBatch {
@@ -101,11 +102,17 @@ fn make_record_batch() -> RecordBatch {
         ),
     ]));

-    RecordBatch::try_new(
+    let batch = RecordBatch::try_new(
         schema,
         vec![int_array, float_array, utf8_view_array, ts_array],
     )
-    .unwrap()
+    .unwrap();
+
+    // Verify the batch was created correctly
+    println!("Batch created with {} rows", num_rows);
+    println!("First 100 rows");
+    println!("{}", pretty_format_batches(&[batch.clone().slice(0, 100)]).unwrap());
+    batch
 }

let int_array = Arc::new(Int64Array::from(int_values)) as ArrayRef;

// float64 column: derived from int64 (e.g., multiplied by 0.1)
let float_values: Vec<f64> = (0..num_rows).map(|i| i as f64 * 0.1).collect();
let float_array = Arc::new(Float64Array::from(float_values)) as ArrayRef;

// utf8View column: even rows get non-empty strings; odd rows get an empty string;
// every 10Kth even row is "const" to be selective.
let mut string_view_builder = StringViewBuilder::with_capacity(100_000);
for i in 0..num_rows {
if i % 2 == 0 {
if i % 10_000 == 0 {
string_view_builder.append_value("const");
} else {
string_view_builder.append_value("nonempty");
}
} else {
string_view_builder.append_value("");
}
}
let utf8_view_array = Arc::new(string_view_builder.finish()) as ArrayRef;

// Timestamp column: using milliseconds from an epoch (simply using the row index)
let ts_values: Vec<i64> = (0..num_rows as i64).collect();
let ts_array = Arc::new(TimestampMillisecondArray::from(ts_values)) as ArrayRef;

let schema = Arc::new(Schema::new(vec![
Field::new("int64", DataType::Int64, false),
Field::new("float64", DataType::Float64, false),
Field::new("utf8View", DataType::Utf8View, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]));

RecordBatch::try_new(
schema,
vec![int_array, float_array, utf8_view_array, ts_array],
)
.unwrap()
}

/// Writes the record batch to a temporary Parquet file.
fn write_parquet_file() -> NamedTempFile {
let batch = make_record_batch();
let schema = batch.schema();
let props = WriterProperties::builder().build();

let file = tempfile::Builder::new()
.suffix(".parquet")
.tempfile()
.unwrap();
{
let file_reopen = file.reopen().unwrap();
let mut writer = ArrowWriter::try_new(file_reopen, schema.clone(), Some(props)).unwrap();
// Write the entire batch as a single row group.
writer.write(&batch).unwrap();
writer.close().unwrap();
}
file
}

/// Filter function: returns a BooleanArray with true when utf8View <> "".
fn filter_utf8_view_nonempty(batch: &RecordBatch) -> BooleanArray {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these functions are specific to the FilterType you could potentially add them as methods, like

impl FilterType {
  fn filter_batch(&self, batch: &RecordBatch) -> BooleanArray {
    match self {
      Utf8ViewNonEmpty => {
         // iimplement filter here
      }
...
    }
  }
}

let array = batch
.column(batch.schema().index_of("utf8View").unwrap())
.as_any()
.downcast_ref::<StringViewArray>()
.unwrap();
let mut builder = BooleanBuilder::with_capacity(array.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the arrows here, like:

https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html

That will:

  1. Better model what real systems do
  2. Not be a bottleneck for evaluation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed it in latest PR, thank you @alamb !

for i in 0..array.len() {
let keep = !array.value(i).is_empty();
builder.append_value(keep);
}
builder.finish()
}

/// Filter function: returns a BooleanArray with true when utf8View == "const".
fn filter_utf8_view_const(batch: &RecordBatch) -> BooleanArray {
let array = batch
.column(batch.schema().index_of("utf8View").unwrap())
.as_any()
.downcast_ref::<StringViewArray>()
.unwrap();
let mut builder = BooleanBuilder::with_capacity(array.len());
for i in 0..array.len() {
let keep = array.value(i) == "const";
builder.append_value(keep);
}
builder.finish()
}

/// Integer non-selective filter: returns true for even numbers.
fn filter_int64_even(batch: &RecordBatch) -> BooleanArray {
let array = batch
.column(batch.schema().index_of("int64").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let mut builder = BooleanBuilder::with_capacity(array.len());
for i in 0..array.len() {
let keep = array.value(i) % 2 == 0;
builder.append_value(keep);
}
builder.finish()
}

/// Integer selective filter: returns true only when int64 equals 0.
fn filter_int64_eq_zero(batch: &RecordBatch) -> BooleanArray {
let array = batch
.column(batch.schema().index_of("int64").unwrap())
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let mut builder = BooleanBuilder::with_capacity(array.len());
for i in 0..array.len() {
let keep = array.value(i) == 0;
builder.append_value(keep);
}
builder.finish()
}

/// Timestamp filter: returns true when ts > threshold (using 50_000 as example threshold).
fn filter_timestamp_gt(batch: &RecordBatch) -> BooleanArray {
let array = batch
.column(batch.schema().index_of("ts").unwrap())
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let threshold = 50_000;
let mut builder = BooleanBuilder::with_capacity(array.len());
for i in 0..array.len() {
let keep = array.value(i) > threshold;
builder.append_value(keep);
}
builder.finish()
}

#[derive(Clone)]
enum FilterType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the background about why these particular filters are chosen is important. I realize i did not do a good job of describing them on the ticket, but I will now work on some diagrams and descriptions to explain it better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help here to add ASCII art for each filter pattern in the comments

I can make a PR to do so too. I was thinking we could just copy/paste from
#7363 (comment)

Utf8ViewNonEmpty,
Utf8ViewConst,
Int64Even,
Int64EqZero,
TimestampGt,
}

impl std::fmt::Display for FilterType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FilterType::Utf8ViewNonEmpty => write!(f, "utf8View <> ''"),
FilterType::Utf8ViewConst => write!(f, "utf8View = 'const'"),
FilterType::Int64Even => write!(f, "int64 even"),
FilterType::Int64EqZero => write!(f, "int64 = 0"),
FilterType::TimestampGt => write!(f, "ts > 50_000"),
}
}
}

fn benchmark_filters_and_projections(c: &mut Criterion) {
let parquet_file = write_parquet_file();

let runtime = Runtime::new().unwrap(); // Create a new Tokio runtime

// Define filter functions associated with each FilterType.
type FilterFn = fn(&RecordBatch) -> BooleanArray;
let filter_funcs: Vec<(FilterType, FilterFn)> = vec![
(FilterType::Utf8ViewNonEmpty, filter_utf8_view_nonempty),
(FilterType::Utf8ViewConst, filter_utf8_view_const),
(FilterType::Int64Even, filter_int64_even),
(FilterType::Int64EqZero, filter_int64_eq_zero),
(FilterType::TimestampGt, filter_timestamp_gt),
];

let mut group = c.benchmark_group("arrow_reader_row_filter");

// Iterate by value (Copy is available for FilterType and fn pointers)
for (filter_type, filter_fn) in filter_funcs.into_iter() {
for proj_case in ["all_columns", "exclude_filter_column"].iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to make proj_case an enum too (rather than a string) -- mostly so we can document the rationale for each of the cases more easily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point! Thank you @alamb !

// Define indices for all columns: [0: "int64", 1: "float64", 2: "utf8View", 3: "ts"]
let all_indices = vec![0, 1, 2, 3];

// For the output projection, conditionally exclude the filter column.
let output_projection: Vec<usize> = if *proj_case == "all_columns" {
all_indices.clone()
} else {
all_indices
.into_iter()
.filter(|i| match filter_type {
FilterType::Utf8ViewNonEmpty | FilterType::Utf8ViewConst => *i != 2, // Exclude "utf8" (index 2)
FilterType::Int64Even | FilterType::Int64EqZero => *i != 0, // Exclude "int64" (index 0)
FilterType::TimestampGt => *i != 3, // Exclude "ts" (index 3)
})
.collect()
};

// For predicate pushdown, define a projection that includes the column required for the predicate.
let predicate_projection: Vec<usize> = match filter_type {
FilterType::Utf8ViewNonEmpty | FilterType::Utf8ViewConst => vec![2],
FilterType::Int64Even | FilterType::Int64EqZero => vec![0],
FilterType::TimestampGt => vec![3],
};

// Create a benchmark id combining filter type and projection case.
let bench_id = BenchmarkId::new(
format!("filter_case: {} project_case: {}", filter_type, proj_case),
"",
);

group.bench_function(bench_id, |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
// Reopen the Parquet file for each iteration.
let file = File::open(parquet_file.path()).await.unwrap();

// Create a async parquet reader builder with batch_size.
let options = ArrowReaderOptions::new().with_page_index(true);

let builder =
ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.with_batch_size(8192);

let file_metadata = builder.metadata().file_metadata().clone();

let mask = ProjectionMask::roots(
file_metadata.schema_descr(),
output_projection.clone(),
);

let pred_mask = ProjectionMask::roots(
file_metadata.schema_descr(),
predicate_projection.clone(),
);

let f = filter_fn;
let filter = ArrowPredicateFn::new(pred_mask, move |batch: RecordBatch| {
Ok(f(&batch))
});
let stream = builder
.with_projection(mask)
.with_row_filter(RowFilter::new(vec![Box::new(filter)]))
.build()
.unwrap();

// Collect the results into a vector of RecordBatches.
stream.try_collect::<Vec<_>>().await.unwrap();
})
});
});
}
}
}

criterion_group!(benches, benchmark_filters_and_projections);
criterion_main!(benches);
Loading