Skip to content

Commit ebc5b6b

Browse files
committed
test new IcebergEntriesJniScanner
1 parent 2f726d0 commit ebc5b6b

File tree

4 files changed

+27
-87
lines changed

4 files changed

+27
-87
lines changed

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergEntriesJniScanner.java

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,34 @@
1818
package org.apache.doris.iceberg;
1919

2020
import org.apache.iceberg.FileScanTask;
21-
import org.apache.iceberg.MetadataTableType;
22-
import org.apache.iceberg.MetadataTableUtils;
21+
import org.apache.iceberg.Schema;
2322
import org.apache.iceberg.StructLike;
24-
import org.apache.iceberg.TableScan;
23+
import org.apache.iceberg.types.Types.NestedField;
24+
import org.apache.iceberg.util.SerializationUtil;
2525

2626
import java.io.IOException;
27-
import java.util.Iterator;
2827
import java.util.Map;
2928

3029
class IcebergEntriesJniScanner extends IcebergMetadataJniScanner {
31-
private static final String NAME = "entries";
30+
// private static final String NAME = "entries";
31+
private FileScanTask task;
32+
private Schema schema;
3233

3334
public IcebergEntriesJniScanner(int batchSize, Map<String, String> params) {
3435
super(batchSize, params);
36+
task = SerializationUtil.deserializeFromBase64(params.get("serialized_split"));
3537
}
3638

3739
@Override
3840
protected void initReader() throws IOException {
39-
TableScan tableScan = MetadataTableUtils
40-
.createMetadataTableInstance(table, MetadataTableType.ENTRIES).newScan();
41-
Iterator<FileScanTask> fileScanTasks = tableScan.planFiles().iterator();
42-
if (!fileScanTasks.hasNext()) {
43-
throw new IOException("No entries found for table: " + table.name());
44-
}
45-
reader = fileScanTasks.next().asDataTask().rows().iterator();
41+
schema = task.schema();
42+
reader = task.asDataTask().rows().iterator();
4643
}
4744

4845
@Override
4946
protected Object getColumnValue(String columnName, Object row) {
5047
StructLike entry = (StructLike) row;
51-
switch (columnName) {
52-
case "status":
53-
return entry.get(0, Integer.class);
54-
case "snapshot_id":
55-
return entry.get(1, Long.class);
56-
case "sequence_number":
57-
return entry.get(2, Long.class);
58-
case "file_sequence_number":
59-
return entry.get(3, Long.class);
60-
case "data_file":
61-
// TODO: implement data_file
62-
return null;
63-
case "readable_metrics":
64-
// TODO: support readable_metrics
65-
return null;
66-
default:
67-
throw new IllegalArgumentException(
68-
"Unrecognized column name " + columnName + " in Iceberg " + NAME + " metadata table");
69-
}
48+
NestedField field = schema.findField(columnName);
49+
return entry.get(schema.asStruct().fields().indexOf(field), field.type().typeId().javaClass());
7050
}
7151
}

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergMetadataColumnValue.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.doris.common.jni.vec.ColumnValue;
2121

22-
import org.apache.iceberg.data.GenericRecord;
22+
import org.apache.iceberg.StructLike;
2323

2424
import java.math.BigDecimal;
2525
import java.math.BigInteger;
@@ -113,21 +113,18 @@ public byte[] getStringAsBytes() {
113113
} else if (fieldData instanceof byte[]) {
114114
return (byte[]) fieldData;
115115
} else {
116-
return null;
116+
// TODO: handle type HeapByteBuffer
117+
return new byte[0];
117118
}
118119
}
119120

120121
@Override
121122
public LocalDate getDate() {
122-
if (fieldData instanceof Integer) {
123-
return Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).plusDays((int) fieldData).toLocalDate();
124-
}
125-
return null;
123+
return Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).plusDays((int) fieldData).toLocalDate();
126124
}
127125

128126
@Override
129127
public LocalDateTime getDateTime() {
130-
// TODO: Handle the case when fieldData is a timestamp
131128
Instant instant = Instant.ofEpochMilli((long) fieldData);
132129
return LocalDateTime.ofInstant(instant, ZoneId.of(timezone));
133130
}
@@ -141,11 +138,7 @@ public byte[] getBytes() {
141138
public void unpackArray(List<ColumnValue> values) {
142139
List<?> items = (List<?>) fieldData;
143140
for (Object item : items) {
144-
IcebergMetadataColumnValue cv = null;
145-
if (item != null) {
146-
cv = new IcebergMetadataColumnValue(item, timezone);
147-
}
148-
values.add(cv);
141+
values.add(new IcebergMetadataColumnValue(item, timezone));
149142
}
150143
}
151144

@@ -160,14 +153,10 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
160153

161154
@Override
162155
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
163-
GenericRecord record = (GenericRecord) fieldData;
156+
StructLike record = (StructLike) fieldData;
164157
for (Integer fieldIndex : structFieldIndex) {
165-
IcebergMetadataColumnValue value = null;
166-
Object rawValue = record.get(fieldIndex);
167-
if (rawValue != null) {
168-
value = new IcebergMetadataColumnValue(record.get(fieldIndex), timezone);
169-
}
170-
values.add(value);
158+
Object rawValue = record.get(fieldIndex, Object.class);
159+
values.add(new IcebergMetadataColumnValue(rawValue, timezone));
171160
}
172161
}
173162
}

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergMetadataLogEntriesJniScanner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public IcebergMetadataLogEntriesJniScanner(int batchSize, Map<String, String> pa
3737
@Override
3838
protected void initReader() throws IOException {
3939
TableScan tableScan = MetadataTableUtils
40-
.createMetadataTableInstance(table, MetadataTableType.METADATA_LOG_ENTRIES).newScan();
40+
.createMetadataTableInstance(table, MetadataTableType.METADATA_LOG_ENTRIES).newScan()
41+
.select(requiredFields);
4142
Iterator<FileScanTask> fileScanTasks = tableScan.planFiles().iterator();
4243
if (!fileScanTasks.hasNext()) {
4344
throw new IOException("No metadata log entries found for table: " + table.name());

fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.doris.datasource.CatalogIf;
2727
import org.apache.doris.datasource.ExternalCatalog;
2828
import org.apache.doris.datasource.iceberg.IcebergUtils;
29-
import org.apache.doris.datasource.iceberg.share.ManifestFileBean;
3029
import org.apache.doris.mysql.privilege.PrivPredicate;
3130
import org.apache.doris.qe.ConnectContext;
3231
import org.apache.doris.thrift.TIcebergMetadataParams;
@@ -38,15 +37,15 @@
3837
import com.google.common.collect.ImmutableSet;
3938
import com.google.common.collect.Lists;
4039
import com.google.common.collect.Maps;
40+
import org.apache.iceberg.FileScanTask;
4141
import org.apache.iceberg.MetadataTableType;
4242
import org.apache.iceberg.MetadataTableUtils;
4343
import org.apache.iceberg.Table;
44+
import org.apache.iceberg.io.CloseableIterable;
4445
import org.apache.iceberg.util.SerializationUtil;
4546

46-
import java.util.Collections;
4747
import java.util.List;
4848
import java.util.Map;
49-
import java.util.stream.Collectors;
5049

5150
/**
5251
* The class of table valued function for iceberg metadata.
@@ -59,9 +58,6 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
5958
public static final String QUERY_TYPE = "query_type";
6059

6160
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);
62-
private static final ImmutableSet<TIcebergQueryType> CAN_SPLIT_TABLE_TYPES = ImmutableSet.of(
63-
TIcebergQueryType.FILES, TIcebergQueryType.DATA_FILES, TIcebergQueryType.DELETE_FILES,
64-
TIcebergQueryType.PARTITIONS, TIcebergQueryType.POSITION_DELETES);
6561

6662
private final TIcebergQueryType queryType;
6763
private final Map<String, String> hadoopProps;
@@ -114,6 +110,7 @@ public IcebergTableValuedFunction(TableName icebergTableName, TIcebergQueryType
114110
}
115111
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
116112
hadoopProps = externalCatalog.getCatalogProperty().getHadoopProperties();
113+
// catalog.getPreExecutionAuthenticator().execute
117114
Table table = IcebergUtils.getIcebergTable(externalCatalog, icebergTableName.getDb(),
118115
icebergTableName.getTbl());
119116
if (table == null) {
@@ -138,17 +135,17 @@ public TMetadataType getMetadataType() {
138135
@Override
139136
public List<TMetaScanRange> getMetaScanRanges() {
140137
List<TMetaScanRange> scanRanges = Lists.newArrayList();
141-
List<String> splits = getSplits();
142138
String serializedTable = SerializationUtil.serializeToBase64(table);
143-
for (String split : splits) {
139+
CloseableIterable<FileScanTask> splits = sysTable.newScan().planFiles();
140+
for (FileScanTask split : splits) {
144141
TMetaScanRange metaScanRange = new TMetaScanRange();
145142
metaScanRange.setMetadataType(TMetadataType.ICEBERG);
146143
// set iceberg metadata params
147144
TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams();
148145
icebergMetadataParams.setIcebergQueryType(queryType);
149146
icebergMetadataParams.setSerializedTable(serializedTable);
150147
icebergMetadataParams.setHadoopProps(hadoopProps);
151-
icebergMetadataParams.setSerializedSplit(split);
148+
icebergMetadataParams.setSerializedSplit(SerializationUtil.serializeToBase64(split));
152149
metaScanRange.setIcebergParams(icebergMetadataParams);
153150
scanRanges.add(metaScanRange);
154151
}
@@ -165,31 +162,4 @@ public String getTableName() {
165162
public List<Column> getTableColumns() {
166163
return schema;
167164
}
168-
169-
private List<String> getSplits() {
170-
if (!CAN_SPLIT_TABLE_TYPES.contains(queryType)) {
171-
return Lists.newArrayList("");
172-
}
173-
if (table.currentSnapshot() == null) {
174-
return Collections.emptyList();
175-
}
176-
switch (queryType) {
177-
case FILES:
178-
case PARTITIONS:
179-
return table.currentSnapshot().allManifests(table.io()).stream()
180-
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
181-
.collect(Collectors.toList());
182-
case DATA_FILES:
183-
return table.currentSnapshot().dataManifests(table.io()).stream()
184-
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
185-
.collect(Collectors.toList());
186-
case DELETE_FILES:
187-
case POSITION_DELETES:
188-
return table.currentSnapshot().deleteManifests(table.io()).stream()
189-
.map(ManifestFileBean::fromManifest).map(SerializationUtil::serializeToBase64)
190-
.collect(Collectors.toList());
191-
default:
192-
throw new IllegalStateException("Unreachable state with query type " + queryType);
193-
}
194-
}
195165
}

0 commit comments

Comments
 (0)