Skip to content

Commit 2bca0cf

Browse files
committed
impl IcebergHistoryJniScanner
1 parent 6f43f2c commit 2bca0cf

File tree

1 file changed

+73
-0
lines changed

1 file changed

+73
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.iceberg;
19+
20+
import org.apache.iceberg.HistoryEntry;
21+
import org.apache.iceberg.util.SnapshotUtil;
22+
23+
import java.io.IOException;
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
class IcebergHistoryJniScanner extends IcebergMetadataJniScanner {
30+
private static final String NAME = "history";
31+
private static final Map<String, String> HISTORY_SCHEMA = new HashMap<>();
32+
static {
33+
HISTORY_SCHEMA.put("made_current_at", "datetime");
34+
HISTORY_SCHEMA.put("snapshot_id", "bigint");
35+
HISTORY_SCHEMA.put("parent_id", "bigint");
36+
HISTORY_SCHEMA.put("is_current_ancestor", "boolean");
37+
38+
}
39+
Set<Long> ancestorIds;
40+
41+
public IcebergHistoryJniScanner(int batchSize, Map<String, String> params) {
42+
super(batchSize, params);
43+
}
44+
45+
@Override
46+
protected void initReader() throws IOException {
47+
ancestorIds = new HashSet<>(SnapshotUtil.currentAncestorIds(table));
48+
reader = table.history().iterator();
49+
}
50+
51+
@Override
52+
protected Map<String, String> getMetadataSchema() {
53+
return HISTORY_SCHEMA;
54+
}
55+
56+
@Override
57+
protected Object getColumnValue(String columnName, Object row) {
58+
HistoryEntry entry = (HistoryEntry) row;
59+
switch (columnName) {
60+
case "made_current_at":
61+
return entry.timestampMillis();
62+
case "snapshot_id":
63+
return entry.snapshotId();
64+
case "parent_id":
65+
return table.snapshot(entry.snapshotId()).parentId();
66+
case "is_current_ancestor":
67+
return ancestorIds.contains(entry.snapshotId());
68+
default:
69+
throw new IllegalArgumentException(
70+
"Unrecognized column name " + columnName + " in Iceberg " + NAME + " metadata table");
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)