Skip to content

Commit 25110ce

Browse files
committed
refactor
1 parent d0d916d commit 25110ce

File tree

2 files changed

+38
-49
lines changed

2 files changed

+38
-49
lines changed

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergMetadataJniScanner.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
import java.io.IOException;
31-
import java.util.HashMap;
3231
import java.util.Map;
3332
import java.util.TimeZone;
3433
import java.util.stream.Collectors;
@@ -45,12 +44,10 @@ public abstract class IcebergMetadataJniScanner extends JniScanner {
4544
private PreExecutionAuthenticator preExecutionAuthenticator;
4645
private final ClassLoader classLoader;
4746
private final String serializedTable;
48-
private final int batchSize;
4947
private ColumnType[] requiredTypes;
5048

5149
public IcebergMetadataJniScanner(int batchSize, Map<String, String> params) {
5250
this.classLoader = this.getClass().getClassLoader();
53-
this.batchSize = batchSize;
5451
this.requiredFields = params.get("required_fields").split(",");
5552
this.serializedTable = params.get("serialized_table");
5653
this.timezone = params.getOrDefault("time_zone", TimeZone.getDefault().getID());
@@ -91,20 +88,25 @@ public void open() throws IOException {
9188
/**
9289
* Get the metadata schema from the table.
9390
*
94-
* @return a map of metadata column name to type
91+
* @return a map of metadata column name to type, see {@link ColumnType} for how
92+
* to parse the type.
9593
*/
96-
protected abstract HashMap<String, String> getMetadataSchema();
94+
protected abstract Map<String, String> getMetadataSchema();
9795

9896
private void parseRequiredTypes() {
99-
HashMap<String, String> metadataSchema = getMetadataSchema();
97+
Map<String, String> metadataSchema = getMetadataSchema();
10098
requiredTypes = new ColumnType[requiredFields.length];
10199
for (int i = 0; i < requiredFields.length; i++) {
102100
String field = requiredFields[i];
103101
String type = metadataSchema.get(field);
104102
if (type == null) {
105103
throw new IllegalArgumentException("Field " + field + " not found in metadata column map");
106104
}
107-
requiredTypes[i] = ColumnType.parseType(field, type);
105+
ColumnType parsedType = ColumnType.parseType(field, type);
106+
if (parsedType.isUnsupported()) {
107+
throw new IllegalArgumentException("Unsupported type " + type + " for field " + field);
108+
}
109+
requiredTypes[i] = parsedType;
108110
}
109111
}
110112
}

fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSnapshotsJniScanner.java

Lines changed: 29 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,35 @@
1818
package org.apache.doris.iceberg;
1919

2020
import org.apache.doris.common.jni.vec.ColumnValue;
21-
22-
import com.google.common.collect.ImmutableMap;
23-
import com.google.common.collect.Maps;
24-
import com.google.common.collect.Streams;
25-
import org.apache.iceberg.MetadataTableType;
26-
import org.apache.iceberg.MetadataTableUtils;
27-
import org.apache.iceberg.StructLike;
21+
import org.apache.iceberg.Snapshot;
2822
import org.apache.iceberg.Table;
29-
import org.apache.iceberg.TableScan;
30-
import org.apache.iceberg.io.CloseableIterator;
3123

3224
import java.io.IOException;
3325
import java.util.HashMap;
26+
import java.util.Iterator;
3427
import java.util.Map;
3528

3629
class IcebergSnapshotsJniScanner extends IcebergMetadataJniScanner {
3730

38-
private CloseableIterator<StructLike> reader;
39-
private Map<String, Integer> columnNameToPosition = new HashMap<>();
31+
private static final Map<String, String> SNAPSHOTS_SCHEMA = new HashMap<>();
32+
static {
33+
SNAPSHOTS_SCHEMA.put("committed_at", "datetime");
34+
SNAPSHOTS_SCHEMA.put("snapshot_id", "long");
35+
SNAPSHOTS_SCHEMA.put("parent_id", "long");
36+
SNAPSHOTS_SCHEMA.put("operation", "string");
37+
SNAPSHOTS_SCHEMA.put("manifest_list", "string");
38+
SNAPSHOTS_SCHEMA.put("summary", "string");
39+
}
40+
41+
private Iterator<Snapshot> reader;
4042

4143
public IcebergSnapshotsJniScanner(int batchSize, Map<String, String> params) {
4244
super(batchSize, params);
4345
}
4446

4547
@Override
4648
protected void loadTable(Table table) throws IOException {
47-
TableScan tableScan = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.SNAPSHOTS)
48-
.newScan();
49-
this.columnNameToPosition = Streams.mapWithIndex(tableScan.schema().columns().stream(),
50-
(column, position) -> Maps.immutableEntry(column.name(), Long.valueOf(position).intValue()))
51-
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
52-
for (String requiredField : requiredFields) {
53-
if (!columnNameToPosition.containsKey(requiredField)) {
54-
throw new IOException("Invalid required field: " + requiredField);
55-
}
56-
}
57-
this.reader = tableScan.planFiles().iterator().next().asDataTask().rows().iterator();
49+
reader = table.snapshots().iterator();
5850
}
5951

6052
@Override
@@ -64,10 +56,10 @@ protected int getNext() throws IOException {
6456
}
6557
int rows = 0;
6658
while (reader.hasNext() && rows < getBatchSize()) {
67-
StructLike dataRow = reader.next();
59+
Snapshot snapshot = reader.next();
6860
for (int i = 0; i < requiredFields.length; i++) {
6961
String columnName = requiredFields[i];
70-
Object value = getValue(columnName, dataRow);
62+
Object value = getValue(columnName, snapshot);
7163
if (value == null) {
7264
appendData(i, null);
7365
} else {
@@ -82,39 +74,34 @@ protected int getNext() throws IOException {
8274

8375
@Override
8476
public void close() throws IOException {
77+
// TODO: move this to base class
8578
if (reader != null) {
86-
reader.close();
79+
reader = null; // Clear the iterator to release resources
8780
}
8881
}
8982

9083
@Override
91-
protected HashMap<String, String> getMetadataSchema() {
92-
HashMap<String, String> metadataSchema = new HashMap<>();
93-
metadataSchema.put("committed_at", "long");
94-
metadataSchema.put("snapshot_id", "long");
95-
metadataSchema.put("parent_id", "long");
96-
metadataSchema.put("operation", "string");
97-
metadataSchema.put("manifest_list", "string");
98-
metadataSchema.put("summary", "string");
99-
return metadataSchema;
84+
protected Map<String, String> getMetadataSchema() {
85+
return SNAPSHOTS_SCHEMA;
10086
}
10187

102-
private Object getValue(String columnName, StructLike dataRow) {
88+
private Object getValue(String columnName, Snapshot snapshot) {
10389
switch (columnName) {
10490
case "committed_at":
105-
return dataRow.get(columnNameToPosition.get(columnName), Long.class) / 1000;
91+
return snapshot.timestampMillis();
10692
case "snapshot_id":
107-
return dataRow.get(columnNameToPosition.get(columnName), Long.class);
93+
return snapshot.snapshotId();
10894
case "parent_id":
109-
return dataRow.get(columnNameToPosition.get(columnName), Long.class);
95+
return snapshot.parentId();
11096
case "operation":
111-
return dataRow.get(columnNameToPosition.get(columnName), String.class);
97+
return snapshot.operation();
11298
case "manifest_list":
113-
return dataRow.get(columnNameToPosition.get(columnName), String.class);
99+
return snapshot.manifestListLocation();
114100
case "summary":
115-
return dataRow.get(columnNameToPosition.get(columnName), Map.class);
101+
return snapshot.summary();
116102
default:
117-
throw new IllegalArgumentException("Unrecognized column name " + columnName);
103+
throw new IllegalArgumentException(
104+
"Unrecognized column name " + columnName + " in Iceberg snapshot metadata table");
118105
}
119106
}
120107
}

0 commit comments

Comments
 (0)