Skip to content

Commit 2f726d0

Browse files
committed
fix Partitions
1 parent cbaabce commit 2f726d0

File tree

3 files changed

+37
-14
lines changed

3 files changed

+37
-14
lines changed

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergMetadataJniScanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@
3838
* Abstract class for Iceberg metadata scanner.
3939
*/
4040
public abstract class IcebergMetadataJniScanner extends JniScanner {
41+
protected static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataJniScanner.class);
4142
protected final String[] requiredFields;
4243
protected final Table table;
4344
protected final String timezone;
4445
protected Iterator<?> reader; // reader is initialized in the initReader() method
4546

4647
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
47-
private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataJniScanner.class);
4848
private final PreExecutionAuthenticator preExecutionAuthenticator;
4949
private final ClassLoader classLoader;
5050
private ColumnType[] requiredTypes;

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergPartitionsJniScanner.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.iceberg.ManifestFiles;
2525
import org.apache.iceberg.PartitionData;
2626
import org.apache.iceberg.PartitionField;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.Snapshot;
2729
import org.apache.iceberg.Table;
2830
import org.apache.iceberg.data.GenericRecord;
2931
import org.apache.iceberg.types.Type;
@@ -41,10 +43,14 @@
4143

4244
class IcebergPartitionsJniScanner extends IcebergMetadataJniScanner {
4345
private static final String NAME = "partitions";
46+
private static final List<String> SCAN_COLUMNS = List.of("content", "partition", "file_size_in_bytes",
47+
"record_count");
4448

4549
private List<PartitionField> partitionFields;
4650
private Long lastUpdateTime;
51+
private Long lastUpdateSnapshotId;
4752
private Integer specId;
53+
private Schema schema;
4854
private GenericRecord reusedRecord;
4955

5056
// A serializable bean that contains a bare minimum to read a manifest
@@ -58,16 +64,18 @@ public IcebergPartitionsJniScanner(int batchSize, Map<String, String> params) {
5864
@Override
5965
protected void initReader() throws IOException {
6066
this.specId = manifestBean.partitionSpecId();
61-
this.lastUpdateTime = table.snapshot(manifestBean.snapshotId()) != null
62-
? table.snapshot(manifestBean.snapshotId()).timestampMillis()
63-
: null;
67+
Snapshot snapshot = table.snapshot(manifestBean.snapshotId());
68+
this.lastUpdateTime = snapshot != null ? snapshot.timestampMillis() : null;
69+
this.lastUpdateSnapshotId = snapshot != null ? snapshot.snapshotId() : null;
6470
this.partitionFields = getAllPartitionFields(table);
65-
// TODO: Initialize the reused record with partition fields
66-
// this.reusedRecord = GenericRecord.create(getResultType());
71+
this.schema = table.schema();
72+
this.reusedRecord = GenericRecord.create(getResultType());
6773
if (manifestBean.content() == ManifestContent.DATA) {
68-
reader = ManifestFiles.read(manifestBean, table.io(), table.specs()).iterator();
74+
reader = ManifestFiles.read(manifestBean, table.io(), table.specs()).select(SCAN_COLUMNS)
75+
.caseSensitive(false).iterator();
6976
} else {
70-
reader = ManifestFiles.readDeleteManifest(manifestBean, table.io(), table.specs()).iterator();
77+
reader = ManifestFiles.readDeleteManifest(manifestBean, table.io(), table.specs()).select(SCAN_COLUMNS)
78+
.caseSensitive(false).iterator();
7179
}
7280
}
7381

@@ -83,25 +91,39 @@ protected Object getColumnValue(String columnName, Object row) {
8391
case "record_count":
8492
return content == FileContent.DATA ? file.recordCount() : 0;
8593
case "file_count":
86-
return content == FileContent.DATA ? 1L : 0L;
94+
return content == FileContent.DATA ? 1 : 0;
8795
case "total_data_file_size_in_bytes":
8896
return content == FileContent.DATA ? file.fileSizeInBytes() : 0;
8997
case "position_delete_record_count":
9098
return content == FileContent.POSITION_DELETES ? file.recordCount() : 0;
9199
case "position_delete_file_count":
92-
return content == FileContent.POSITION_DELETES ? 1L : 0L;
100+
return content == FileContent.POSITION_DELETES ? 1 : 0;
93101
case "equality_delete_record_count":
94102
return content == FileContent.EQUALITY_DELETES ? file.recordCount() : 0;
95103
case "equality_delete_file_count":
96-
return content == FileContent.EQUALITY_DELETES ? 1L : 0L;
104+
return content == FileContent.EQUALITY_DELETES ? 1 : 0;
97105
case "last_updated_at":
98106
return lastUpdateTime;
107+
case "last_updated_snapshot_id":
108+
return lastUpdateSnapshotId;
99109
default:
100-
throw new IllegalArgumentException(
101-
"Unrecognized column name " + columnName + " in Iceberg " + NAME + " metadata table");
110+
LOG.warn("Unrecognized column name " + columnName + " in Iceberg " + NAME + " metadata table");
111+
return null;
102112
}
103113
}
104114

115+
private Types.StructType getResultType() {
116+
List<Types.NestedField> fields = new ArrayList<>();
117+
for (PartitionField partitionField : partitionFields) {
118+
int id = partitionField.fieldId();
119+
String name = partitionField.name();
120+
Type type = partitionField.transform().getResultType(schema.findType(partitionField.sourceId()));
121+
Types.NestedField nestedField = Types.NestedField.optional(id, name, type);
122+
fields.add(nestedField);
123+
}
124+
return Types.StructType.of(fields);
125+
}
126+
105127
private Object getPartitionValues(PartitionData partitionData) {
106128
List<Types.NestedField> fileFields = partitionData.getPartitionType().fields();
107129
Map<Integer, Integer> fieldIdToPos = new HashMap<>();

fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
6161
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);
6262
private static final ImmutableSet<TIcebergQueryType> CAN_SPLIT_TABLE_TYPES = ImmutableSet.of(
6363
TIcebergQueryType.FILES, TIcebergQueryType.DATA_FILES, TIcebergQueryType.DELETE_FILES,
64-
TIcebergQueryType.POSITION_DELETES);
64+
TIcebergQueryType.PARTITIONS, TIcebergQueryType.POSITION_DELETES);
6565

6666
private final TIcebergQueryType queryType;
6767
private final Map<String, String> hadoopProps;
@@ -175,6 +175,7 @@ private List<String> getSplits() {
175175
}
176176
switch (queryType) {
177177
case FILES:
178+
case PARTITIONS:
178179
return table.currentSnapshot().allManifests(table.io()).stream()
179180
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
180181
.collect(Collectors.toList());

0 commit comments

Comments
 (0)