Skip to content

remove arrow dependency from data skipping #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ readme.workspace = true
version.workspace = true

[dependencies]
arrow-array = { version = "^49.0" }
arrow-select = { version = "^49.0" }
bytes = "1.4"
chrono = { version = "0.4" }
either = "1.8"
Expand All @@ -33,6 +31,8 @@ z85 = "3.0.5"
visibility = "0.1.0"

# Used in default client
arrow-array = { version = "^49.0", optional = true }
arrow-select = { version = "^49.0", optional = true }
arrow-arith = { version = "^49.0", optional = true }
arrow-json = { version = "^49.0", optional = true }
arrow-ord = { version = "^49.0", optional = true }
Expand All @@ -49,11 +49,13 @@ tokio = { version = "1", optional = true, features = ["rt-multi-thread"] }
arrow-conversion = ["arrow-schema"]
default = ["simple-client"]
default-client = [
"arrow-array",
"arrow-conversion",
"arrow-arith",
"arrow-json",
"arrow-ord",
"arrow-schema",
"arrow-select",
"futures",
"object_store",
"parquet/async",
Expand All @@ -63,8 +65,10 @@ default-client = [

developer-visibility = []
simple-client = [
"arrow-array",
"arrow-conversion",
"arrow-json",
"arrow-select",
"parquet"
]

Expand Down
15 changes: 15 additions & 0 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ impl DataVisitor for MetadataVisitor {
}
}

#[derive(Default)]
pub(crate) struct SelectionVectorVisitor {
pub(crate) selection_vector: Vec<bool>,
}

impl DataVisitor for SelectionVectorVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
self.selection_vector
.push(getters[0].get(i, "selectionvector.output")?)
}
Ok(())
}
}

#[derive(Default)]
pub(crate) struct ProtocolVisitor {
pub(crate) protocol: Option<Protocol>,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub struct FileMeta {
/// Connectors can implement this interface to optimize the evaluation using the
/// connector specific capabilities.
pub trait ExpressionEvaluator {
/// Evaluate the expression on given ColumnarBatch data.
/// Evaluate the expression on a given EngineData.
///
/// Contains one value for each row of the input.
/// The data type of the output is same as the type output of the expression this evaluator is using.
Expand Down
85 changes: 36 additions & 49 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow_array::{Array, BooleanArray};
use arrow_select::filter::filter_record_batch;
use tracing::debug;

use crate::error::{DeltaResult, Error};
use crate::actions::visitors::SelectionVectorVisitor;
use crate::error::DeltaResult;
use crate::expressions::{BinaryOperator, Expression as Expr, VariadicOperator};
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::simple_client::data::SimpleData;
use crate::{EngineData, EngineInterface, ExpressionEvaluator, JsonHandler};

/// Returns <op2> (if any) such that B <op2> A is equivalent to A <op> B.
Expand Down Expand Up @@ -116,8 +114,8 @@ impl DataSkippingFilter {
static ref PREDICATE_SCHEMA: DataType = StructType::new(vec![
StructField::new("predicate", DataType::BOOLEAN, true),
]).into();
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false));
static ref STATS_EXPR: Expr = Expr::column("add.stats");
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note i did just a little bit of reorganization just to list things in the order they are used.

);

let predicate = match predicate {
Expand Down Expand Up @@ -147,17 +145,21 @@ impl DataSkippingFilter {

// Skipping happens in several steps:
//
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A
// value of true means the stats say we must keep the file, and null means we could not
// determine whether the file is safe to skip, because its stats were missing/null.
//
// 2. The nullif(skip, skip) converts true (= keep) to null, producing a result
// that contains only false (= skip) and null (= keep) values.
// 1. The stats selector fetches add.stats from the metadata
//
// 3. The is_null converts null to true, producing a result that contains only true (=
// keep) and false (= skip) values.
// 2. The predicate (skipping evaluator) produces false for any file whose stats prove we
// can safely skip it. A value of true means the stats say we must keep the file, and
// null means we could not determine whether the file is safe to skip, because its stats
// were missing/null.
//
// 4. The filter discards every file whose selection vector entry is false.
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = table_client.get_expression_handler().get_evaluator(
stats_schema.clone(),
STATS_EXPR.clone(),
DataType::STRING,
);

let skipping_evaluator = table_client.get_expression_handler().get_evaluator(
stats_schema.clone(),
Expr::struct_expr([as_data_skipping_predicate(predicate)?]),
Expand All @@ -170,12 +172,6 @@ impl DataSkippingFilter {
DataType::BOOLEAN,
);

let select_stats_evaluator = table_client.get_expression_handler().get_evaluator(
stats_schema.clone(),
STATS_EXPR.clone(),
DataType::STRING,
);

Some(Self {
stats_schema,
select_stats_evaluator,
Expand All @@ -185,44 +181,35 @@ impl DataSkippingFilter {
})
}

// TODO(nick): This should not be expressed in terms of SimpleData, but should use only the
// expression API
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>> {
/// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector
/// which can be applied to the actions to find those that passed data skipping.
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Vec<bool>> {
// retrieve and parse stats from actions data
let stats = self.select_stats_evaluator.evaluate(actions)?;
let parsed_stats = self
.json_handler
.parse_json(stats, self.stats_schema.clone())?;

// evaluate the predicate on the parsed stats, then convert to selection vector
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?;

let skipping_vector = self
let selection_vector = self
.filter_evaluator
.evaluate(skipping_predicate.as_ref())?;
let skipping_vector = skipping_vector
.as_any()
.downcast_ref::<SimpleData>()
.ok_or(Error::engine_data_type("SimpleData"))?
.record_batch()
.column(0);
let skipping_vector = skipping_vector
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or(Error::unexpected_column_type(
"Expected type 'BooleanArray'.",
))?;

let before_count = actions.length();
let actions = actions
.as_any()
.downcast_ref::<SimpleData>()
.ok_or(Error::engine_data_type("SimpleData"))?
.record_batch();
let after = filter_record_batch(actions, skipping_vector)?;
debug!(
"number of actions before/after data skipping: {before_count} / {}",
after.num_rows()
);
Ok(Box::new(SimpleData::new(after)))
// visit the engine's selection vector to produce a Vec<bool>
let mut visitor = SelectionVectorVisitor::default();
let schema = StructType::new(vec![StructField::new("output", DataType::BOOLEAN, false)]);
selection_vector
.as_ref()
.extract(Arc::new(schema), &mut visitor)?;
Ok(visitor.selection_vector)

// TODO(zach): add some debug info about data skipping that occurred
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will need to happen at the higher level, or the selection vector could count the # of trues it has

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meh kinda just left this open for now lol

// let before_count = actions.length();
// debug!(
// "number of actions before/after data skipping: {before_count} / {}",
// filtered_actions.num_rows()
// );
}
}

Expand Down
36 changes: 26 additions & 10 deletions kernel/src/scan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,39 @@ struct LogReplayScanner {
struct AddRemoveVisitor {
adds: Vec<Add>,
removes: Vec<Remove>,
selection_vector: Option<Vec<bool>>,
}

const ADD_FIELD_COUNT: usize = 15;

impl AddRemoveVisitor {
fn new(selection_vector: Option<Vec<bool>>) -> Self {
AddRemoveVisitor {
selection_vector,
..Default::default()
}
}
}

impl DataVisitor for AddRemoveVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
// Add will have a path at index 0 if it is valid
if let Some(path) = getters[0].get_opt(i, "add.path")? {
self.adds
.push(AddVisitor::visit_add(i, path, &getters[..ADD_FIELD_COUNT])?);
// Keep the file unless the selection vector is present and is false for this row
if !self
.selection_vector
.as_ref()
.is_some_and(|selection| !selection[i])
{
self.adds
.push(AddVisitor::visit_add(i, path, &getters[..ADD_FIELD_COUNT])?)
Comment on lines +50 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotta love functional languages, where the "simpler" approach actually produces more lines of code 🤦

}
}
// Remove will have a path at index 15 if it is valid
// TODO(nick): Should count the fields in Add to ensure we don't get this wrong if more
// are added
// TODO(zach): add a check for selection vector that we never skip a remove
else if let Some(path) = getters[ADD_FIELD_COUNT].get_opt(i, "remove.path")? {
let remove_getters = &getters[ADD_FIELD_COUNT..];
self.removes
Expand Down Expand Up @@ -70,15 +88,13 @@ impl LogReplayScanner {
actions: &dyn EngineData,
is_log_batch: bool,
) -> DeltaResult<Vec<Add>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we applied the JVM kernel approach here, this method would take the engine data as input, and return a selection vector that covers both data skipping and log replay for rows of this batch. The engine data would not be modified at all, and engine is free to apply the filtering however it wishes.

This also avoids the need to (pay the cost to) parse EngineData rows into Add and Remove structs, which in turn avoids the need to expose those structs as part of the public API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return a selection vector that covers both data skipping and log replay for rows of this batch

Oh like the selection vector would just 'pick' which actions (which would all be Add actions) represent the valid files after data skipping/applying removes from the seen set?

This also avoids the need to (pay the cost to) parse EngineData rows into Add and Remove structs, which in turn avoids the need to expose those structs as part of the public API.

we still would need to inspect the path/dv in order to add to the 'seen' set and perform the log replay (remove action) filtering, correct? You're just suggesting to do this on-the-fly instead of parsing into structs?

Copy link
Contributor

@ryan-johnson-databricks ryan-johnson-databricks Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the visitor isn't required to parse the exploded fields into a struct! It could just ask for just the fields we need to examine, update the selection vector accordingly, and let the engine data continue providing the actual rows.

let filtered_actions = self
// apply data skipping to get back a selection vector for actions that passed skipping
// note: None implies all files passed data skipping.
let selection_vector = self
.filter
.as_ref()
.map(|filter| filter.apply(actions))
.transpose()?;
let actions = match filtered_actions {
Some(ref filtered_actions) => filtered_actions.as_ref(),
None => actions,
};

let schema_to_use = StructType::new(if is_log_batch {
vec![
Expand All @@ -90,12 +106,12 @@ impl LogReplayScanner {
// only serve as tombstones for vacuum jobs. So no need to load them here.
vec![crate::actions::schemas::ADD_FIELD.clone()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works -- the visitor blindly accesses getters[ADD_FIELD_COUNT], which will panic out of bounds if/when we ever have !is_log_batch. At a minimum, we need to preserve remove.path so the not-null check can skip it, but that would also require us to filter out removes from checkpoint parts at scan level, so that remove.path is always null (which also reduces the cost of the scan by not fetching those columns in the first place).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, great catch. Can't we just propagate is_log_batch into the visitor, and not try to look for removes if it's false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could work as a short-term mitigation, yes. Longer term, we want the schema filtering to happen higher up in the stack, so there isn't even a remove column and the add.path IS NOT NULL check is pushed down into the scan.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed with short-term here :)

});
let mut visitor = AddRemoveVisitor::default();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoint bugfix: passing is_log_batch down to visitor

let mut visitor = AddRemoveVisitor::new(selection_vector);
actions.extract(Arc::new(schema_to_use), &mut visitor)?;

for remove in visitor.removes.into_iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: We don't need to process removes before adds within any given batch, because each batch is a subset of some commit, and a given commit cannot legally contain more than one action for a given path. We just need to remember them in self.seen in case the next batch needs them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yep +1 I can fix in a separate (small) PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we fixing though? It's good to know that we don't need to look at the removes first, but it's also not wrong.

The visitor has separated them for us already, so it's not any extra cost to do them first.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit for something I noticed below on line 98 (sorry, github won't let me comment there). You can remove a clone:

        for remove in visitor.removes.into_iter() {
            let dv_id = remove.dv_unique_id();
            self.seen.insert((remove.path, dv_id));
        }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the clone :) and can open an issue if we want to modify the way we process removes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we fixing though? It's good to know that we don't need to look at the removes first, but it's also not wrong.

It requires materializing (arbitrarily large) lists of adds and removes, which kernel shouldn't be in the business of doing in the first place. We should really be returning the original engine data along with updated selection vector, since the engine already went to the trouble of allocating all the actions for us there.

Copy link
Collaborator

@nicklan nicklan Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This is a more general "fix the way scan works" kind of change. If I understand correctly, your model would switch scan to an iterator, and not pass around Adds really ever. This code would only generate the selection vector, and return the batch as "underlying data + vector". Then the next method on scan would actually poke at the engine data to get the add file paths, dvs, and filter out removed adds, and return data as read by the engine. I've noted that in #123.

I'd say we merge this PR just to get arrow out, and then look at changing the return type of scan along the lines of #123 to make this more efficient.

@ryan-johnson-databricks does that make sense to you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wooooo!

self.seen
.insert((remove.path.clone(), remove.dv_unique_id()));
let dv_id = remove.dv_unique_id();
self.seen.insert((remove.path, dv_id));
}

visitor
Expand Down