Skip to content

Commit 9801801

Browse files
mapleFUpitrou
andauthored
GH-31992: [C++][Parquet] Handling the special case when DataPageV2 values buffer is empty (#45252)
### Rationale for this change In DataPageV2, the levels and data will not be compressed together. So, we might get the "empty" data page buffer. When meeting this, Snappy C++ will failed to decompress the `(input_len == 0, output_len == 0)` data. ### What changes are included in this PR? Handling the case in `column_reader.cc` ### Are these changes tested? * [x] Will add ### Are there any user-facing changes? Minor fix * GitHub Issue: #31992 Lead-authored-by: mwish <[email protected]> Co-authored-by: mwish <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: mwish <[email protected]>
1 parent 61c82ea commit 9801801

File tree

3 files changed

+49
-8
lines changed

3 files changed

+49
-8
lines changed

cpp/src/parquet/column_reader.cc

+15-7
Original file line numberDiff line numberDiff line change
@@ -580,13 +580,21 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
580580
memcpy(decompressed, page_buffer->data(), levels_byte_len);
581581
}
582582

583-
// Decompress the values
584-
PARQUET_ASSIGN_OR_THROW(
585-
auto decompressed_len,
586-
decompressor_->Decompress(compressed_len - levels_byte_len,
587-
page_buffer->data() + levels_byte_len,
588-
uncompressed_len - levels_byte_len,
589-
decompression_buffer_->mutable_data() + levels_byte_len));
583+
// GH-31992: DataPageV2 may store only levels and no values when all
584+
// values are null. In this case, Parquet java is known to produce a
585+
// 0-len compressed area (which is invalid compressed input).
586+
// See https://github.com/apache/parquet-java/issues/3122
587+
int64_t decompressed_len = 0;
588+
if (uncompressed_len - levels_byte_len != 0) {
589+
// Decompress the values
590+
PARQUET_ASSIGN_OR_THROW(
591+
decompressed_len,
592+
decompressor_->Decompress(
593+
compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len,
594+
uncompressed_len - levels_byte_len,
595+
decompression_buffer_->mutable_data() + levels_byte_len));
596+
}
597+
590598
if (decompressed_len != uncompressed_len - levels_byte_len) {
591599
throw ParquetException("Page didn't decompress to expected size, expected: " +
592600
std::to_string(uncompressed_len - levels_byte_len) +

cpp/src/parquet/column_reader_test.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
465465
int32_t values[batch_size];
466466
int64_t values_read;
467467
ASSERT_TRUE(reader->HasNext());
468-
EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*replevels=*/nullptr,
468+
EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*rep_levels=*/nullptr,
469469
values, &values_read));
470470
EXPECT_EQ(3, values_read);
471471
}

cpp/src/parquet/column_writer_test.cc

+33
Original file line numberDiff line numberDiff line change
@@ -1774,5 +1774,38 @@ TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
17741774
ASSERT_EQ("bar", value);
17751775
}
17761776

1777+
TEST_F(TestValuesWriterInt32Type, AllNullsCompressionInPageV2) {
1778+
// GH-31992: In DataPageV2, the levels and data will not be compressed together,
1779+
// so, when all values are null, the compressed values should be empty. And
1780+
// we should handle this case correctly.
1781+
std::vector<Compression::type> compressions = {Compression::SNAPPY, Compression::GZIP,
1782+
Compression::ZSTD, Compression::BROTLI,
1783+
Compression::LZ4};
1784+
for (auto compression : compressions) {
1785+
if (!Codec::IsAvailable(compression)) {
1786+
continue;
1787+
}
1788+
ARROW_SCOPED_TRACE("compression = ", Codec::GetCodecAsString(compression));
1789+
// Optional and non-repeated, with definition levels
1790+
// but no repetition levels
1791+
this->SetUpSchema(Repetition::OPTIONAL);
1792+
this->GenerateData(SMALL_SIZE);
1793+
std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0);
1794+
ColumnProperties column_properties;
1795+
column_properties.set_compression(compression);
1796+
1797+
auto writer =
1798+
this->BuildWriter(SMALL_SIZE, column_properties, ParquetVersion::PARQUET_2_LATEST,
1799+
ParquetDataPageVersion::V2);
1800+
writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr,
1801+
this->values_ptr_);
1802+
writer->Close();
1803+
1804+
ASSERT_EQ(100, this->metadata_num_values());
1805+
this->ReadColumn(compression);
1806+
ASSERT_EQ(0, this->values_read_);
1807+
}
1808+
}
1809+
17771810
} // namespace test
17781811
} // namespace parquet

0 commit comments

Comments
 (0)