-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable #35589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable #35589
Conversation
Assigning reviewers: R: @m-trieu for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @talatuyarer, left some comments
private static FilterPredicate convert(RexNode e, Schema beamSchema) { | ||
SqlKind kind = e.getKind(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to validate that RexNode e
is indeed a RexCall
?
List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size()); | ||
SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS; | ||
|
||
// CHANGE: Use an explicit loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elaborate what this means? Should this be a TODO?
return beamSchema.getField(columnRef.getIndex()).getName(); | ||
} | ||
|
||
private static FilterPredicate createIntPredicate(SqlKind kind, String name, Integer value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the create___Predicate
methods can be merged into one method. A lot of it seems to be duplicated logic
case DATE: | ||
case TIME: | ||
case TIMESTAMP: | ||
case ARRAY: | ||
case MAP: | ||
case ROW: | ||
default: | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will suppress bad filters and may confuse users who expect it to work. Can we throw an UnsupportedOperationException instead?
default: | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw an UnsupportedOperationException?
@@ -340,6 +345,12 @@ public Read withProjection(Schema projectionSchema, Schema encoderSchema) { | |||
.build(); | |||
} | |||
|
|||
/** Specifies a filter predicate to use for filtering records. */ | |||
public Read withFilter(ParquetFilter predicate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would a Java user make use of this? This sink relies on ParquetFilter
being an instance of ParquetFilterImpl
, which is not available to the user.
import org.apache.parquet.filter2.predicate.Operators.LongColumn; | ||
import org.apache.parquet.io.api.Binary; | ||
|
||
public class ParquetFilterFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need thorough testing for this class
Schema projectionSchema = projectSchema(schema, fieldNames); | ||
LOG.info("Projecting fields schema: {}", projectionSchema); | ||
read = read.withProjection(projectionSchema, projectionSchema); | ||
read = read.withProjection(readSchema, readSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be making the unwanted columns nullable for the second readSchema
. Check ParquetIO javadoc:
beam/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
Lines 137 to 141 in cd72a25
* <p>Reading with projection can be enabled with the projection schema as following. Splittable | |
* reading is enabled when reading with projection. The projection_schema contains only the column | |
* that we would like to read and encoder_schema contains the schema to encode the output with the | |
* unwanted columns changed to nullable. Partial reading provide decrease of reading time due to | |
* partial processing of the data and partial encoding. The decrease in the reading time depends on |
@Parameter(3) | ||
public long expectedReadCount; | ||
|
||
@Parameter(4) | ||
public List<Row> expectedRows; | ||
|
||
@Parameter(5) | ||
public Schema expectedSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need expectedReadCount
or expectedSchema
, right? That information is already captured in expectedRows
"SELECT * FROM ProductInfo WHERE price > ? AND is_stocked = ?", | ||
Arrays.asList(100.0, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the values inside the array supposed to be substituted for "?"
? If so, the output seems to be incorrect
This PR fixes #19748
This pull request introduces support for filter and projection pushdown to the ParquetIO connector when used as a Beam SQL source, significantly improving query performance by reducing the amount of data read from disk and transferred over the network.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.