-
Notifications
You must be signed in to change notification settings - Fork 81
remove arrow dependency from data skipping #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
d948434
42f3f3d
aebf692
aba9d83
ba7fc7e
7bee3a6
5ee02c3
9758b28
7ce02ce
30ae91f
3be3279
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,12 @@ | ||
use std::collections::HashSet; | ||
use std::sync::Arc; | ||
|
||
use arrow_array::{Array, BooleanArray}; | ||
use arrow_select::filter::filter_record_batch; | ||
use tracing::debug; | ||
|
||
use crate::error::{DeltaResult, Error}; | ||
use crate::actions::visitors::SelectionVectorVisitor; | ||
use crate::error::DeltaResult; | ||
use crate::expressions::{BinaryOperator, Expression as Expr, VariadicOperator}; | ||
use crate::schema::{DataType, SchemaRef, StructField, StructType}; | ||
use crate::simple_client::data::SimpleData; | ||
use crate::{EngineData, EngineInterface, ExpressionEvaluator, JsonHandler}; | ||
|
||
/// Returns <op2> (if any) such that B <op2> A is equivalent to A <op> B. | ||
|
@@ -116,8 +114,8 @@ impl DataSkippingFilter { | |
static ref PREDICATE_SCHEMA: DataType = StructType::new(vec![ | ||
StructField::new("predicate", DataType::BOOLEAN, true), | ||
]).into(); | ||
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false)); | ||
static ref STATS_EXPR: Expr = Expr::column("add.stats"); | ||
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false)); | ||
); | ||
|
||
let predicate = match predicate { | ||
|
@@ -147,17 +145,21 @@ impl DataSkippingFilter { | |
|
||
// Skipping happens in several steps: | ||
// | ||
// 1. The predicate produces false for any file whose stats prove we can safely skip it. A | ||
// value of true means the stats say we must keep the file, and null means we could not | ||
// determine whether the file is safe to skip, because its stats were missing/null. | ||
// | ||
// 2. The nullif(skip, skip) converts true (= keep) to null, producing a result | ||
// that contains only false (= skip) and null (= keep) values. | ||
// 1. The stats selector fetches add.stats from the metadata | ||
// | ||
// 3. The is_null converts null to true, producing a result that contains only true (= | ||
// keep) and false (= skip) values. | ||
// 2. The predicate (skipping evaluator) produces false for any file whose stats prove we | ||
// can safely skip it. A value of true means the stats say we must keep the file, and | ||
// null means we could not determine whether the file is safe to skip, because its stats | ||
// were missing/null. | ||
// | ||
// 4. The filter discards every file whose selection vector entry is false. | ||
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when | ||
// the predicate is true/null and false (= skip) when the predicate is false. | ||
let select_stats_evaluator = table_client.get_expression_handler().get_evaluator( | ||
stats_schema.clone(), | ||
STATS_EXPR.clone(), | ||
DataType::STRING, | ||
); | ||
|
||
let skipping_evaluator = table_client.get_expression_handler().get_evaluator( | ||
stats_schema.clone(), | ||
Expr::struct_expr([as_data_skipping_predicate(predicate)?]), | ||
|
@@ -170,12 +172,6 @@ impl DataSkippingFilter { | |
DataType::BOOLEAN, | ||
); | ||
|
||
let select_stats_evaluator = table_client.get_expression_handler().get_evaluator( | ||
stats_schema.clone(), | ||
STATS_EXPR.clone(), | ||
DataType::STRING, | ||
); | ||
|
||
Some(Self { | ||
stats_schema, | ||
select_stats_evaluator, | ||
|
@@ -185,44 +181,35 @@ impl DataSkippingFilter { | |
}) | ||
} | ||
|
||
// TODO(nick): This should not be expressed in terms of SimpleData, but should use only the | ||
// expression API | ||
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>> { | ||
/// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector | ||
/// which can be applied to the actions to find those that passed data skipping. | ||
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Vec<bool>> { | ||
// retrieve and parse stats from actions data | ||
let stats = self.select_stats_evaluator.evaluate(actions)?; | ||
let parsed_stats = self | ||
.json_handler | ||
.parse_json(stats, self.stats_schema.clone())?; | ||
|
||
// evaluate the predicate on the parsed stats, then convert to selection vector | ||
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?; | ||
|
||
let skipping_vector = self | ||
let selection_vector = self | ||
.filter_evaluator | ||
.evaluate(skipping_predicate.as_ref())?; | ||
let skipping_vector = skipping_vector | ||
.as_any() | ||
.downcast_ref::<SimpleData>() | ||
.ok_or(Error::engine_data_type("SimpleData"))? | ||
.record_batch() | ||
.column(0); | ||
let skipping_vector = skipping_vector | ||
.as_any() | ||
.downcast_ref::<BooleanArray>() | ||
.ok_or(Error::unexpected_column_type( | ||
"Expected type 'BooleanArray'.", | ||
))?; | ||
|
||
let before_count = actions.length(); | ||
let actions = actions | ||
.as_any() | ||
.downcast_ref::<SimpleData>() | ||
.ok_or(Error::engine_data_type("SimpleData"))? | ||
.record_batch(); | ||
let after = filter_record_batch(actions, skipping_vector)?; | ||
debug!( | ||
"number of actions before/after data skipping: {before_count} / {}", | ||
after.num_rows() | ||
); | ||
Ok(Box::new(SimpleData::new(after))) | ||
// visit the engine's selection vector to produce a Vec<bool> | ||
let mut visitor = SelectionVectorVisitor::default(); | ||
let schema = StructType::new(vec![StructField::new("output", DataType::BOOLEAN, false)]); | ||
selection_vector | ||
.as_ref() | ||
.extract(Arc::new(schema), &mut visitor)?; | ||
Ok(visitor.selection_vector) | ||
|
||
// TODO(zach): add some debug info about data skipping that occurred | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will need to happen at the higher level, or the selection vector could count the # of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. meh kinda just left this open for now lol |
||
// let before_count = actions.length(); | ||
// debug!( | ||
// "number of actions before/after data skipping: {before_count} / {}", | ||
// filtered_actions.num_rows() | ||
// ); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,21 +24,39 @@ struct LogReplayScanner { | |
struct AddRemoveVisitor { | ||
adds: Vec<Add>, | ||
removes: Vec<Remove>, | ||
selection_vector: Option<Vec<bool>>, | ||
} | ||
|
||
const ADD_FIELD_COUNT: usize = 15; | ||
|
||
impl AddRemoveVisitor { | ||
fn new(selection_vector: Option<Vec<bool>>) -> Self { | ||
AddRemoveVisitor { | ||
selection_vector, | ||
..Default::default() | ||
} | ||
} | ||
} | ||
|
||
impl DataVisitor for AddRemoveVisitor { | ||
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { | ||
for i in 0..row_count { | ||
// Add will have a path at index 0 if it is valid | ||
if let Some(path) = getters[0].get_opt(i, "add.path")? { | ||
self.adds | ||
.push(AddVisitor::visit_add(i, path, &getters[..ADD_FIELD_COUNT])?); | ||
// Keep the file unless the selection vector is present and is false for this row | ||
if !self | ||
.selection_vector | ||
.as_ref() | ||
.is_some_and(|selection| !selection[i]) | ||
{ | ||
self.adds | ||
.push(AddVisitor::visit_add(i, path, &getters[..ADD_FIELD_COUNT])?) | ||
Comment on lines
+50
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotta love functional languages, where the "simpler" approach actually produces more lines of code 🤦 |
||
} | ||
} | ||
// Remove will have a path at index 15 if it is valid | ||
// TODO(nick): Should count the fields in Add to ensure we don't get this wrong if more | ||
// are added | ||
// TODO(zach): add a check for selection vector that we never skip a remove | ||
else if let Some(path) = getters[ADD_FIELD_COUNT].get_opt(i, "remove.path")? { | ||
let remove_getters = &getters[ADD_FIELD_COUNT..]; | ||
self.removes | ||
|
@@ -70,15 +88,13 @@ impl LogReplayScanner { | |
actions: &dyn EngineData, | ||
is_log_batch: bool, | ||
) -> DeltaResult<Vec<Add>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we applied the JVM kernel approach here, this method would take the engine data as input, and return a selection vector that covers both data skipping and log replay for rows of this batch. The engine data would not be modified at all, and engine is free to apply the filtering however it wishes. This also avoids the need to (pay the cost to) parse There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh like the selection vector would just 'pick' which actions (which would all be
we still would need to inspect the path/dv in order to add to the 'seen' set and perform the log replay (remove action) filtering, correct? You're just suggesting to do this on-the-fly instead of parsing into structs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the visitor isn't required to parse the exploded fields into a struct! It could just ask for just the fields we need to examine, update the selection vector accordingly, and let the engine data continue providing the actual rows. |
||
let filtered_actions = self | ||
// apply data skipping to get back a selection vector for actions that passed skipping | ||
// note: None implies all files passed data skipping. | ||
let selection_vector = self | ||
.filter | ||
.as_ref() | ||
.map(|filter| filter.apply(actions)) | ||
.transpose()?; | ||
let actions = match filtered_actions { | ||
Some(ref filtered_actions) => filtered_actions.as_ref(), | ||
None => actions, | ||
}; | ||
|
||
let schema_to_use = StructType::new(if is_log_batch { | ||
vec![ | ||
|
@@ -90,12 +106,12 @@ impl LogReplayScanner { | |
// only serve as tombstones for vacuum jobs. So no need to load them here. | ||
vec![crate::actions::schemas::ADD_FIELD.clone()] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this works -- the visitor blindly accesses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, great catch. Can't we just propagate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That could work as a short-term mitigation, yes. Longer term, we want the schema filtering to happen higher up in the stack, so there isn't even a remove column and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed with short-term here :) |
||
}); | ||
let mut visitor = AddRemoveVisitor::default(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checkpoint bugfix: passing is_log_batch down to visitor |
||
let mut visitor = AddRemoveVisitor::new(selection_vector); | ||
actions.extract(Arc::new(schema_to_use), &mut visitor)?; | ||
|
||
for remove in visitor.removes.into_iter() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: We don't need to process removes before adds within any given batch, because each batch is a subset of some commit, and a given commit cannot legally contain more than one action for a given path. We just need to remember them in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yep +1 I can fix in a separate (small) PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are we fixing though? It's good to know that we don't need to look at the removes first, but it's also not wrong. The visitor has separated them for us already, so it's not any extra cost to do them first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. small nit for something I noticed below on line 98 (sorry, github won't let me comment there). You can remove a for remove in visitor.removes.into_iter() {
let dv_id = remove.dv_unique_id();
self.seen.insert((remove.path, dv_id));
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed the clone :) and can open an issue if we want to modify the way we process removes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It requires materializing (arbitrarily large) lists of adds and removes, which kernel shouldn't be in the business of doing in the first place. We should really be returning the original engine data along with updated selection vector, since the engine already went to the trouble of allocating all the actions for us there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. This is a more general "fix the way scan works" kind of change. If I understand correctly, your model would switch scan to an iterator, and not pass around I'd say we merge this PR just to get arrow out, and then look at changing the return type of @ryan-johnson-databricks does that make sense to you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wooooo! |
||
self.seen | ||
.insert((remove.path.clone(), remove.dv_unique_id())); | ||
let dv_id = remove.dv_unique_id(); | ||
self.seen.insert((remove.path, dv_id)); | ||
} | ||
|
||
visitor | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note i did just a little bit of reorganization just to list things in the order they are used.