Skip to content

Added Filter and Projection Pushdown support to ParquetIO and Beam SQL's ParquetTable #35589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

talatuyarer
Copy link
Contributor

This PR fixes #19748

This pull request introduces support for filter and projection pushdown to the ParquetIO connector when used as a Beam SQL source, significantly improving query performance by reducing the amount of data read from disk and transferred over the network.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Assigning reviewers:

R: @m-trieu for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @talatuyarer, left some comments

Comment on lines +102 to +103
private static FilterPredicate convert(RexNode e, Schema beamSchema) {
SqlKind kind = e.getKind();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that RexNode e is indeed a RexCall?

List<RexNode> valueNodes = call.getOperands().subList(1, call.getOperands().size());
SqlKind comparison = isNotIn ? SqlKind.NOT_EQUALS : SqlKind.EQUALS;

// CHANGE: Use an explicit loop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elaborate what this means? Should this be a TODO?

return beamSchema.getField(columnRef.getIndex()).getName();
}

private static FilterPredicate createIntPredicate(SqlKind kind, String name, Integer value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the create___Predicate methods can be merged into one method. A lot of it seems to be duplicated logic

Comment on lines +269 to +276
case DATE:
case TIME:
case TIMESTAMP:
case ARRAY:
case MAP:
case ROW:
default:
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will suppress bad filters and may confuse users who expect it to work. Can we throw an UnsupportedOperationException instead?

Comment on lines +207 to +208
default:
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw an UnsupportedOperationException?

@@ -340,6 +345,12 @@ public Read withProjection(Schema projectionSchema, Schema encoderSchema) {
.build();
}

/** Specifies a filter predicate to use for filtering records. */
public Read withFilter(ParquetFilter predicate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would a Java user make use of this? This sink relies on ParquetFilter being an instance of ParquetFilterImpl, which is not available to the user.

import org.apache.parquet.filter2.predicate.Operators.LongColumn;
import org.apache.parquet.io.api.Binary;

public class ParquetFilterFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need thorough testing for this class

Schema projectionSchema = projectSchema(schema, fieldNames);
LOG.info("Projecting fields schema: {}", projectionSchema);
read = read.withProjection(projectionSchema, projectionSchema);
read = read.withProjection(readSchema, readSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be making the unwanted columns nullable for the second readSchema. Check ParquetIO javadoc:

* <p>Reading with projection can be enabled with the projection schema as following. Splittable
* reading is enabled when reading with projection. The projection_schema contains only the column
* that we would like to read and encoder_schema contains the schema to encode the output with the
* unwanted columns changed to nullable. Partial reading provide decrease of reading time due to
* partial processing of the data and partial encoding. The decrease in the reading time depends on

Comment on lines +94 to +101
@Parameter(3)
public long expectedReadCount;

@Parameter(4)
public List<Row> expectedRows;

@Parameter(5)
public Schema expectedSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need expectedReadCount or expectedSchema, right? That information is already captured in expectedRows

Comment on lines +144 to +145
"SELECT * FROM ProductInfo WHERE price > ? AND is_stocked = ?",
Arrays.asList(100.0, true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the values inside the array supposed to be substituted for "?" ? If so, the output seems to be incorrect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParquetTable.buildIOReader should support column projection and filter predicate
2 participants