-
Notifications
You must be signed in to change notification settings - Fork 906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for file row numbers in Parquet readers #7307
base: main
Are you sure you want to change the base?
Conversation
Thanks for you submission @jkylling, I'll try to get a first pass review done this week. In the meantime please add the Apache license to row_number.rs and correct the other lint errors. 🙏 |
Updated. Looking forward to the first review! I was very confused as to why cargo format did not work properly, but looks like you are already aware of this (#6179) :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, just a few nits for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @jkylling for taking this on. I've finished my first pass and have only one reservation. Otherwise it looks good and meets the criteria set forth in #7299 (comment).
row_groups: VecDeque::from( | ||
row_groups | ||
.into_iter() | ||
.map(TryInto::try_into) | ||
.collect::<Result<Vec<_>>>()?, | ||
), | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding myself a bit uneasy with adding the first row number to the RowGroupMetaData
. Rather than that, could this bit here instead be changed to keep track of the first row number while populating the deque? Is there some wrinkle I'm missing? Might the row groups be filtered before instantiating the RowNumberReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered my own question...it seems there's some complexity here at least when using the async reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe we don't have access to all row groups when creating the array readers.
I took a quick look at the corresponding Parquet reader implementations for Trino and parquet-java.
Trino:
- Has a boolean to include a row number column, https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L112
- Includes this column when the boolean is set: https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L337
- Has a special block reader for reading row indexes https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L385-L393 I believe the positions play a similar role to our
RowSelectors
. - Gets row indexes from
RowGroupInfo
, a pruned version of https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L456 - Populates the
fileRowOffset
by iterating through the row groups: https://github.com/trinodb/trino/blob/master/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java#L107-L111
parquet-java:
- Has a method for tracking the current row index: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L150-L155
- This row index is based on an iterator which starts form a row group row index, https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java#L311-L339
- This row group row index is initialized by iterating through the row groups: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1654-L1656 (mapping obtained here: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1496-L1506)
Their approaches are rather similar to ours.
One take away is that the above implementations do not be keep the full RowGroupMetaData
s around as we do by requiring an iterator over RowGroupMetadata
in the RowGroups
trait. This is likely a good idea as this struct can be quite large. What do you think about changing the RowGroups
trait to something like below?
/// A collection of row groups
pub trait RowGroups {
/// Get the number of rows in this collection
fn num_rows(&self) -> usize {
self.row_group_infos.iter().map(|info| info.num_rows).sum()
}
/// Returns a [`PageIterator`] for the column chunks with the given leaf column index
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
/// Returns an iterator over the row groups in this collection
fn row_group_infos(&self) -> Box<dyn Iterator<Item = &RowGroupInfo> + '_>;
}
struct RowGroupInfo {
num_rows: usize,
row_index: i64,
}
parquet/src/file/metadata/mod.rs
Outdated
@@ -584,6 +585,11 @@ impl RowGroupMetaData { | |||
self.num_rows | |||
} | |||
|
|||
/// Returns the first row number in this row group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns the first row number in this row group. | |
/// Returns the global index number for the first row in this row group. |
And perhaps use first_row_index
instead? That may be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Updated.
Which issue does this PR close?
Closes #7299.
What changes are included in this PR?
In this PR we:
ArrowReaderBuilder
to set arow_number_column
used to extend the readRecordBatches
with an additional column with file row numbers.ArrayReader
to the vector ofArrayReader
s reading columns from the Parquet file, if therow_number_column
is set in the reader configuration. This is aRowNumberReader
, which is a specialArrayReader
. It reads no data from the Parquet pages, but uses the first row numbers in theRowGroupMetaData
to keep track of progress.The
RowGroupMetaData::first_row_number
isOption<i64>
, since it is possible that the row number is unknown (I encountered an instance of this when trying to integrate this PR in delta-rs), and it's better ifNone
is used instead of some special integer value.The performance impact of this PR should be negligible when the row number column is not set. The only additional overhead would be the tracking of the
first_row_number
of each row group.Are there any user-facing changes?
We add an additional public method:
ArrowReaderBuilder::with_row_number_column
There are a few breaking changes as we touch a few public interfaces:
RowGroupMetaData::from_thrift
andRowGroupMetaData::from_thrift_encrypted
takes an additional parameterfirst_row_number: Optional<i64>
.RowGroups
has an additional methodRowGroups::row_groups
. Potentially this method could replace theRowGroups::num_rows
method or provide a default implementation for it.ParquetError::RowGroupMetaDataMissingRowNumber
.I'm very open to suggestions on how to reduce the amount of breaking changes.