Skip to content

Commit 1a5999a

Browse files
authored
Simplify ParquetRecordBatchReader control logic (#7512)
1 parent 847534d commit 1a5999a

File tree

1 file changed

+30
-29
lines changed
  • parquet/src/arrow/arrow_reader

1 file changed

+30
-29
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -800,24 +800,33 @@ impl Iterator for ParquetRecordBatchReader {
800800
type Item = Result<RecordBatch, ArrowError>;
801801

802802
fn next(&mut self) -> Option<Self::Item> {
803+
self.next_inner()
804+
.map_err(|arrow_err| arrow_err.into())
805+
.transpose()
806+
}
807+
}
808+
809+
impl ParquetRecordBatchReader {
810+
/// Returns the next `RecordBatch` from the reader, or `None` if the reader
811+
/// has reached the end of the file.
812+
///
813+
/// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
814+
/// simplify error handling with `?`
815+
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
803816
let mut read_records = 0;
804817
match self.selection.as_mut() {
805818
Some(selection) => {
806819
while read_records < self.batch_size && !selection.is_empty() {
807820
let front = selection.pop_front().unwrap();
808821
if front.skip {
809-
let skipped = match self.array_reader.skip_records(front.row_count) {
810-
Ok(skipped) => skipped,
811-
Err(e) => return Some(Err(e.into())),
812-
};
822+
let skipped = self.array_reader.skip_records(front.row_count)?;
813823

814824
if skipped != front.row_count {
815-
return Some(Err(general_err!(
825+
return Err(general_err!(
816826
"failed to skip rows, expected {}, got {}",
817827
front.row_count,
818828
skipped
819-
)
820-
.into()));
829+
));
821830
}
822831
continue;
823832
}
@@ -839,35 +848,27 @@ impl Iterator for ParquetRecordBatchReader {
839848
}
840849
_ => front.row_count,
841850
};
842-
match self.array_reader.read_records(to_read) {
843-
Ok(0) => break,
844-
Ok(rec) => read_records += rec,
845-
Err(error) => return Some(Err(error.into())),
846-
}
851+
match self.array_reader.read_records(to_read)? {
852+
0 => break,
853+
rec => read_records += rec,
854+
};
847855
}
848856
}
849857
None => {
850-
if let Err(error) = self.array_reader.read_records(self.batch_size) {
851-
return Some(Err(error.into()));
852-
}
858+
self.array_reader.read_records(self.batch_size)?;
853859
}
854860
};
855861

856-
match self.array_reader.consume_batch() {
857-
Err(error) => Some(Err(error.into())),
858-
Ok(array) => {
859-
let struct_array = array.as_struct_opt().ok_or_else(|| {
860-
ArrowError::ParquetError(
861-
"Struct array reader should return struct array".to_string(),
862-
)
863-
});
862+
let array = self.array_reader.consume_batch()?;
863+
let struct_array = array.as_struct_opt().ok_or_else(|| {
864+
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
865+
})?;
864866

865-
match struct_array {
866-
Err(err) => Some(Err(err)),
867-
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
868-
}
869-
}
870-
}
867+
Ok(if struct_array.len() > 0 {
868+
Some(RecordBatch::from(struct_array))
869+
} else {
870+
None
871+
})
871872
}
872873
}
873874

0 commit comments

Comments
 (0)