-
Notifications
You must be signed in to change notification settings - Fork 81
remove arrow dependency from data skipping #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove arrow dependency from data skipping #132
Conversation
static ref STATS_EXPR: Expr = Expr::column("add.stats"); | ||
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note i did just a little bit of reorganization just to list things in the order they are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This is awesome.
Can we also ensure we make arrow fully optional in Cargo.toml
.extract(Arc::new(schema), &mut visitor)?; | ||
Ok(visitor.selection_vector) | ||
|
||
// TODO(zach): add some debug info about data skipping that occurred |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will need to happen at the higher level, or the selection vector could count the # of true
s it has
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meh kinda just left this open for now lol
@@ -70,15 +86,13 @@ impl LogReplayScanner { | |||
actions: &dyn EngineData, | |||
is_log_batch: bool, | |||
) -> DeltaResult<Vec<Add>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we applied the JVM kernel approach here, this method would take the engine data as input, and return a selection vector that covers both data skipping and log replay for rows of this batch. The engine data would not be modified at all, and engine is free to apply the filtering however it wishes.
This also avoids the need to (pay the cost to) parse EngineData
rows into Add
and Remove
structs, which in turn avoids the need to expose those structs as part of the public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return a selection vector that covers both data skipping and log replay for rows of this batch
Oh like the selection vector would just 'pick' which actions (which would all be Add
actions) represent the valid files after data skipping/applying removes from the seen set?
This also avoids the need to (pay the cost to) parse EngineData rows into Add and Remove structs, which in turn avoids the need to expose those structs as part of the public API.
we still would need to inspect the path/dv in order to add to the 'seen' set and perform the log replay (remove action) filtering, correct? You're just suggesting to do this on-the-fly instead of parsing into structs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the visitor isn't required to parse the exploded fields into a struct! It could just ask for just the fields we need to examine, update the selection vector accordingly, and let the engine data continue providing the actual rows.
@@ -90,7 +104,7 @@ impl LogReplayScanner { | |||
// only serve as tombstones for vacuum jobs. So no need to load them here. | |||
vec![crate::actions::schemas::ADD_FIELD.clone()] | |||
}); | |||
let mut visitor = AddRemoveVisitor::default(); | |||
let mut visitor = AddRemoveVisitor::new(selection_vector); | |||
actions.extract(Arc::new(schema_to_use), &mut visitor)?; | |||
|
|||
for remove in visitor.removes.into_iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: We don't need to process removes before adds within any given batch, because each batch is a subset of some commit, and a given commit cannot legally contain more than one action for a given path. We just need to remember them in self.seen
in case the next batch needs them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yep +1 I can fix in a separate (small) PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we fixing though? It's good to know that we don't need to look at the removes first, but it's also not wrong.
The visitor has separated them for us already, so it's not any extra cost to do them first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit for something I noticed below on line 98 (sorry, github won't let me comment there). You can remove a clone
:
for remove in visitor.removes.into_iter() {
let dv_id = remove.dv_unique_id();
self.seen.insert((remove.path, dv_id));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the clone :) and can open an issue if we want to modify the way we process removes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we fixing though? It's good to know that we don't need to look at the removes first, but it's also not wrong.
It requires materializing (arbitrarily large) lists of adds and removes, which kernel shouldn't be in the business of doing in the first place. We should really be returning the original engine data along with updated selection vector, since the engine already went to the trouble of allocating all the actions for us there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. This is a more general "fix the way scan works" kind of change. If I understand correctly, your model would switch scan to an iterator, and not pass around Add
s really ever. This code would only generate the selection vector, and return the batch as "underlying data + vector". Then the next
method on scan
would actually poke at the engine data to get the add file paths, dvs, and filter out removed adds, and return data as read by the engine. I've noted that in #123.
I'd say we merge this PR just to get arrow out, and then look at changing the return type of scan
along the lines of #123 to make this more efficient.
@ryan-johnson-databricks does that make sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wooooo!
@@ -90,7 +104,7 @@ impl LogReplayScanner { | |||
// only serve as tombstones for vacuum jobs. So no need to load them here. | |||
vec![crate::actions::schemas::ADD_FIELD.clone()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works -- the visitor blindly accesses getters[ADD_FIELD_COUNT]
, which will panic out of bounds if/when we ever have !is_log_batch
. At a minimum, we need to preserve remove.path
so the not-null check can skip it, but that would also require us to filter out removes from checkpoint parts at scan level, so that remove.path
is always null (which also reduces the cost of the scan by not fetching those columns in the first place).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, great catch. Can't we just propagate is_log_batch
into the visitor, and not try to look for removes if it's false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could work as a short-term mitigation, yes. Longer term, we want the schema filtering to happen higher up in the stack, so there isn't even a remove column and the add.path IS NOT NULL
check is pushed down into the scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed with short-term here :)
Co-authored-by: Nick Lanham <[email protected]>
Co-authored-by: Ryan Johnson <[email protected]>
Co-authored-by: Ryan Johnson <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, the actual changes from this PR are good and badly needed. All outstanding concerns are pre-existing issues or future work that shouldn't delay merge.
if !self | ||
.selection_vector | ||
.as_ref() | ||
.is_some_and(|selection| !selection[i]) | ||
{ | ||
self.adds | ||
.push(AddVisitor::visit_add(i, path, &getters[..ADD_FIELD_COUNT])?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotta love functional languages, where the "simpler" approach actually produces more lines of code 🤦
.map_ok(|batch| (batch, true)); | ||
|
||
let parquet_client = engine_interface.get_parquet_handler(); | ||
let checkpoint_stream = parquet_client | ||
.read_parquet_files(&self.checkpoint_files, read_schema, predicate)? | ||
.read_parquet_files(&self.checkpoint_files, checkpoint_read_schema, predicate)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess still TODO to push the not-null predicates down to the scan for columns that survived pruning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea adding a comment and i'll open an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -90,12 +111,12 @@ impl LogReplayScanner { | |||
// only serve as tombstones for vacuum jobs. So no need to load them here. | |||
vec![crate::actions::schemas::ADD_FIELD.clone()] | |||
}); | |||
let mut visitor = AddRemoveVisitor::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkpoint bugfix: passing is_log_batch down to visitor
// b) recursed into a optional struct that was null. In this case, array.is_none() is | ||
// true and we don't need to check field nullability, because we assume all fields | ||
// of a nullable struct can be null | ||
// So below if the field is allowed to be null, OR array.is_none() we push that, | ||
// otherwise we error out. | ||
if let Some(col) = col { | ||
Self::extract_column(out_col_array, field, col)?; | ||
} else if field.is_nullable() { | ||
if let DataType::Struct(_) = field.data_type() { | ||
Self::extract_columns_from_array(out_col_array, schema, None)?; | ||
} else if array.is_none() || field.is_nullable() { | ||
if let DataType::Struct(inner_struct) = field.data_type() { | ||
Self::extract_columns_from_array(out_col_array, inner_struct.as_ref(), None)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from nick :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice thanks, lgtm
removed arrow dep from data skipping code. we were previously using
filter_record_batch
. Now instead of eagerly performing a filter, the new data skipping code:AddRemoveVisitor
consumes this selection vector to apply the filtering as the actions are iterated.resolves #126
ADDITIONALLY: fixed bug in checkpoint reads. we now read appropriate add/removes for commits and only adds for checkpoints. and bugfix for simpleclient data extraction.