Skip to content

Commit 00546ea

Browse files
committed
test new IcebergEntriesJniScanner
1 parent 2f726d0 commit 00546ea

File tree

3 files changed

+20
-68
lines changed

3 files changed

+20
-68
lines changed

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergEntriesJniScanner.java

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,34 @@
1818
package org.apache.doris.iceberg;
1919

2020
import org.apache.iceberg.FileScanTask;
21-
import org.apache.iceberg.MetadataTableType;
22-
import org.apache.iceberg.MetadataTableUtils;
21+
import org.apache.iceberg.Schema;
2322
import org.apache.iceberg.StructLike;
24-
import org.apache.iceberg.TableScan;
23+
import org.apache.iceberg.types.Types.NestedField;
24+
import org.apache.iceberg.util.SerializationUtil;
2525

2626
import java.io.IOException;
27-
import java.util.Iterator;
2827
import java.util.Map;
2928

3029
class IcebergEntriesJniScanner extends IcebergMetadataJniScanner {
31-
private static final String NAME = "entries";
30+
// private static final String NAME = "entries";
31+
private FileScanTask task;
32+
private Schema schema;
3233

3334
public IcebergEntriesJniScanner(int batchSize, Map<String, String> params) {
3435
super(batchSize, params);
36+
task = SerializationUtil.deserializeFromBase64(params.get("serialized_split"));
3537
}
3638

3739
@Override
3840
protected void initReader() throws IOException {
39-
TableScan tableScan = MetadataTableUtils
40-
.createMetadataTableInstance(table, MetadataTableType.ENTRIES).newScan();
41-
Iterator<FileScanTask> fileScanTasks = tableScan.planFiles().iterator();
42-
if (!fileScanTasks.hasNext()) {
43-
throw new IOException("No entries found for table: " + table.name());
44-
}
45-
reader = fileScanTasks.next().asDataTask().rows().iterator();
41+
schema = task.schema();
42+
reader = task.asDataTask().rows().iterator();
4643
}
4744

4845
@Override
4946
protected Object getColumnValue(String columnName, Object row) {
5047
StructLike entry = (StructLike) row;
51-
switch (columnName) {
52-
case "status":
53-
return entry.get(0, Integer.class);
54-
case "snapshot_id":
55-
return entry.get(1, Long.class);
56-
case "sequence_number":
57-
return entry.get(2, Long.class);
58-
case "file_sequence_number":
59-
return entry.get(3, Long.class);
60-
case "data_file":
61-
// TODO: implement data_file
62-
return null;
63-
case "readable_metrics":
64-
// TODO: support readable_metrics
65-
return null;
66-
default:
67-
throw new IllegalArgumentException(
68-
"Unrecognized column name " + columnName + " in Iceberg " + NAME + " metadata table");
69-
}
48+
NestedField field = schema.findField(columnName);
49+
return entry.get(field.fieldId(), field.type().typeId().javaClass());
7050
}
7151
}

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergMetadataLogEntriesJniScanner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public IcebergMetadataLogEntriesJniScanner(int batchSize, Map<String, String> pa
3737
@Override
3838
protected void initReader() throws IOException {
3939
TableScan tableScan = MetadataTableUtils
40-
.createMetadataTableInstance(table, MetadataTableType.METADATA_LOG_ENTRIES).newScan();
40+
.createMetadataTableInstance(table, MetadataTableType.METADATA_LOG_ENTRIES).newScan()
41+
.select(requiredFields);
4142
Iterator<FileScanTask> fileScanTasks = tableScan.planFiles().iterator();
4243
if (!fileScanTasks.hasNext()) {
4344
throw new IOException("No metadata log entries found for table: " + table.name());

fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.doris.datasource.CatalogIf;
2727
import org.apache.doris.datasource.ExternalCatalog;
2828
import org.apache.doris.datasource.iceberg.IcebergUtils;
29-
import org.apache.doris.datasource.iceberg.share.ManifestFileBean;
3029
import org.apache.doris.mysql.privilege.PrivPredicate;
3130
import org.apache.doris.qe.ConnectContext;
3231
import org.apache.doris.thrift.TIcebergMetadataParams;
@@ -38,15 +37,16 @@
3837
import com.google.common.collect.ImmutableSet;
3938
import com.google.common.collect.Lists;
4039
import com.google.common.collect.Maps;
40+
41+
import org.apache.iceberg.FileScanTask;
4142
import org.apache.iceberg.MetadataTableType;
4243
import org.apache.iceberg.MetadataTableUtils;
4344
import org.apache.iceberg.Table;
45+
import org.apache.iceberg.io.CloseableIterable;
4446
import org.apache.iceberg.util.SerializationUtil;
4547

46-
import java.util.Collections;
4748
import java.util.List;
4849
import java.util.Map;
49-
import java.util.stream.Collectors;
5050

5151
/**
5252
* The class of table valued function for iceberg metadata.
@@ -59,9 +59,6 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
5959
public static final String QUERY_TYPE = "query_type";
6060

6161
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);
62-
private static final ImmutableSet<TIcebergQueryType> CAN_SPLIT_TABLE_TYPES = ImmutableSet.of(
63-
TIcebergQueryType.FILES, TIcebergQueryType.DATA_FILES, TIcebergQueryType.DELETE_FILES,
64-
TIcebergQueryType.PARTITIONS, TIcebergQueryType.POSITION_DELETES);
6562

6663
private final TIcebergQueryType queryType;
6764
private final Map<String, String> hadoopProps;
@@ -114,6 +111,7 @@ public IcebergTableValuedFunction(TableName icebergTableName, TIcebergQueryType
114111
}
115112
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
116113
hadoopProps = externalCatalog.getCatalogProperty().getHadoopProperties();
114+
// catalog.getPreExecutionAuthenticator().execute
117115
Table table = IcebergUtils.getIcebergTable(externalCatalog, icebergTableName.getDb(),
118116
icebergTableName.getTbl());
119117
if (table == null) {
@@ -138,17 +136,17 @@ public TMetadataType getMetadataType() {
138136
@Override
139137
public List<TMetaScanRange> getMetaScanRanges() {
140138
List<TMetaScanRange> scanRanges = Lists.newArrayList();
141-
List<String> splits = getSplits();
142139
String serializedTable = SerializationUtil.serializeToBase64(table);
143-
for (String split : splits) {
140+
CloseableIterable<FileScanTask> splits = sysTable.newScan().planFiles();
141+
for (FileScanTask split : splits) {
144142
TMetaScanRange metaScanRange = new TMetaScanRange();
145143
metaScanRange.setMetadataType(TMetadataType.ICEBERG);
146144
// set iceberg metadata params
147145
TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams();
148146
icebergMetadataParams.setIcebergQueryType(queryType);
149147
icebergMetadataParams.setSerializedTable(serializedTable);
150148
icebergMetadataParams.setHadoopProps(hadoopProps);
151-
icebergMetadataParams.setSerializedSplit(split);
149+
icebergMetadataParams.setSerializedSplit(SerializationUtil.serializeToBase64(split));
152150
metaScanRange.setIcebergParams(icebergMetadataParams);
153151
scanRanges.add(metaScanRange);
154152
}
@@ -165,31 +163,4 @@ public String getTableName() {
165163
public List<Column> getTableColumns() {
166164
return schema;
167165
}
168-
169-
private List<String> getSplits() {
170-
if (!CAN_SPLIT_TABLE_TYPES.contains(queryType)) {
171-
return Lists.newArrayList("");
172-
}
173-
if (table.currentSnapshot() == null) {
174-
return Collections.emptyList();
175-
}
176-
switch (queryType) {
177-
case FILES:
178-
case PARTITIONS:
179-
return table.currentSnapshot().allManifests(table.io()).stream()
180-
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
181-
.collect(Collectors.toList());
182-
case DATA_FILES:
183-
return table.currentSnapshot().dataManifests(table.io()).stream()
184-
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
185-
.collect(Collectors.toList());
186-
case DELETE_FILES:
187-
case POSITION_DELETES:
188-
return table.currentSnapshot().deleteManifests(table.io()).stream()
189-
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
190-
.collect(Collectors.toList());
191-
default:
192-
throw new IllegalStateException("Unreachable state with query type " + queryType);
193-
}
194-
}
195166
}

0 commit comments

Comments
 (0)