Skip to content

Commit 81d9772

Browse files
committed
refactor IcebergTableValuedFunction
1 parent 5dc6a03 commit 81d9772

File tree

5 files changed

+135
-61
lines changed

5 files changed

+135
-61
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.doris.analysis.TableValuedFunctionRef;
2121
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
2222
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
23-
import org.apache.doris.tablefunction.IcebergTableValuedFunction;
23+
import org.apache.doris.tablefunction.iceberg.IcebergTableValuedFunction;
2424

2525
import com.google.common.base.Joiner;
2626
import com.google.common.collect.ImmutableList;

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.doris.nereids.trees.expressions.Properties;
2323
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
2424
import org.apache.doris.nereids.types.coercion.AnyDataType;
25-
import org.apache.doris.tablefunction.IcebergTableValuedFunction;
2625
import org.apache.doris.tablefunction.TableValuedFunctionIf;
26+
import org.apache.doris.tablefunction.iceberg.IcebergTableValuedFunction;
2727

2828
import com.google.common.base.Joiner;
2929
import com.google.common.collect.Maps;
@@ -53,7 +53,7 @@ public FunctionSignature customSignature() {
5353
protected TableValuedFunctionIf toCatalogFunction() {
5454
try {
5555
Map<String, String> arguments = getTVFProperties().getMap();
56-
return new IcebergTableValuedFunction(arguments);
56+
return IcebergTableValuedFunction.create(arguments);
5757
} catch (Throwable t) {
5858
throw new AnalysisException("Can not build IcebergTableValuedFunction by "
5959
+ this + ": " + t.getMessage(), t);

fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.planner.ScanNode;
2727
import org.apache.doris.qe.ConnectContext;
2828
import org.apache.doris.qe.SessionVariable;
29+
import org.apache.doris.tablefunction.iceberg.IcebergTableValuedFunction;
2930

3031
import java.util.List;
3132
import java.util.Map;
@@ -57,7 +58,7 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
5758
case LocalTableValuedFunction.NAME:
5859
return new LocalTableValuedFunction(params);
5960
case IcebergTableValuedFunction.NAME:
60-
return new IcebergTableValuedFunction(params);
61+
return IcebergTableValuedFunction.create(params);
6162
case HudiTableValuedFunction.NAME:
6263
return new HudiTableValuedFunction(params);
6364
case BackendsTableValuedFunction.NAME:
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.tablefunction.iceberg;
19+
20+
import org.apache.doris.analysis.TableName;
21+
import org.apache.doris.catalog.Column;
22+
import org.apache.doris.catalog.MapType;
23+
import org.apache.doris.catalog.ScalarType;
24+
import org.apache.doris.common.AnalysisException;
25+
import org.apache.doris.thrift.TIcebergQueryType;
26+
27+
import com.google.common.collect.ImmutableList;
28+
29+
import java.util.List;
30+
31+
class IcebergHistoryTableValuedFunction extends IcebergTableValuedFunction {
32+
private static final ImmutableList<Column> SCHEMA_SNAPSHOT = ImmutableList.of(
33+
new Column("committed_at", ScalarType.DATETIMEV2),
34+
new Column("snapshot_id", ScalarType.BIGINT),
35+
new Column("parent_id", ScalarType.BIGINT),
36+
new Column("operation", ScalarType.STRING),
37+
new Column("manifest_list", ScalarType.STRING),
38+
new Column("summary", new MapType(ScalarType.STRING, ScalarType.STRING)));
39+
40+
public IcebergHistoryTableValuedFunction(TableName icebergTableName) throws AnalysisException {
41+
super(icebergTableName, TIcebergQueryType.HISTORY);
42+
}
43+
44+
@Override
45+
protected List<Column> getSchema() {
46+
return SCHEMA_SNAPSHOT;
47+
}
48+
}

fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java renamed to fe/fe-core/src/main/java/org/apache/doris/tablefunction/iceberg/IcebergTableValuedFunction.java

Lines changed: 82 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
package org.apache.doris.tablefunction;
18+
package org.apache.doris.tablefunction.iceberg;
1919

2020
import org.apache.doris.analysis.TableName;
2121
import org.apache.doris.catalog.Column;
2222
import org.apache.doris.catalog.Env;
23-
import org.apache.doris.catalog.MapType;
24-
import org.apache.doris.catalog.ScalarType;
2523
import org.apache.doris.common.AnalysisException;
2624
import org.apache.doris.common.ErrorCode;
2725
import org.apache.doris.common.ErrorReport;
@@ -30,13 +28,12 @@
3028
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
3129
import org.apache.doris.mysql.privilege.PrivPredicate;
3230
import org.apache.doris.qe.ConnectContext;
31+
import org.apache.doris.tablefunction.MetadataTableValuedFunction;
3332
import org.apache.doris.thrift.TIcebergMetadataParams;
3433
import org.apache.doris.thrift.TIcebergQueryType;
3534
import org.apache.doris.thrift.TMetaScanRange;
3635
import org.apache.doris.thrift.TMetadataType;
3736

38-
import com.google.common.collect.ImmutableList;
39-
import com.google.common.collect.ImmutableMap;
4037
import com.google.common.collect.ImmutableSet;
4138
import com.google.common.collect.Lists;
4239
import com.google.common.collect.Maps;
@@ -47,31 +44,25 @@
4744
import java.util.Map;
4845

4946
/**
50-
* The Implement of table valued function
47+
* The Abstract class of table valued function for iceberg metadata.
5148
* iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots").
5249
*/
53-
public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
50+
public abstract class IcebergTableValuedFunction extends MetadataTableValuedFunction {
5451

5552
public static final String NAME = "iceberg_meta";
5653
public static final String TABLE = "table";
5754
public static final String QUERY_TYPE = "query_type";
5855

5956
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);
6057

61-
private static final ImmutableList<Column> SCHEMA_SNAPSHOT = ImmutableList.of(
62-
new Column("committed_at", ScalarType.DATETIMEV2),
63-
new Column("snapshot_id", ScalarType.BIGINT),
64-
new Column("parent_id", ScalarType.BIGINT),
65-
new Column("operation", ScalarType.STRING),
66-
new Column("manifest_list", ScalarType.STRING),
67-
new Column("summary", new MapType(ScalarType.STRING, ScalarType.STRING)));
68-
69-
private TIcebergQueryType queryType;
70-
71-
// here tableName represents the name of a table in Iceberg.
58+
private final TIcebergQueryType queryType;
7259
private final TableName icebergTableName;
7360

74-
public IcebergTableValuedFunction(Map<String, String> params) throws AnalysisException {
61+
private final Map<String, String> hadoopProps;
62+
protected final Table table;
63+
64+
public static IcebergTableValuedFunction create(Map<String, String> params)
65+
throws AnalysisException {
7566
Map<String, String> validParams = Maps.newHashMap();
7667
for (String key : params.keySet()) {
7768
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
@@ -89,21 +80,48 @@ public IcebergTableValuedFunction(Map<String, String> params) throws AnalysisExc
8980
if (names.length != 3) {
9081
throw new AnalysisException("The iceberg table name contains the catalogName, databaseName, and tableName");
9182
}
92-
this.icebergTableName = new TableName(names[0], names[1], names[2]);
83+
TableName icebergTableName = new TableName(names[0], names[1], names[2]);
9384
// check auth
9485
if (!Env.getCurrentEnv().getAccessManager()
95-
.checkTblPriv(ConnectContext.get(), this.icebergTableName, PrivPredicate.SELECT)) {
86+
.checkTblPriv(ConnectContext.get(), icebergTableName, PrivPredicate.SELECT)) {
9687
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT",
9788
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
98-
this.icebergTableName.getDb() + ": " + this.icebergTableName.getTbl());
89+
icebergTableName.getDb() + ": " + icebergTableName.getTbl());
9990
}
91+
TIcebergQueryType queryType;
10092
try {
101-
this.queryType = TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
93+
queryType = TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
10294
} catch (IllegalArgumentException e) {
103-
throw new AnalysisException("Unsupported iceberg metadata query type: " + queryType);
95+
throw new AnalysisException("Unrecognized iceberg metadata query type: " + queryTypeString);
96+
}
97+
98+
switch (queryType) {
99+
case HISTORY:
100+
return new IcebergHistoryTableValuedFunction(icebergTableName);
101+
default:
102+
throw new AnalysisException("Unsupported iceberg metadata query type: " + queryType);
104103
}
105104
}
106105

106+
public IcebergTableValuedFunction(TableName icebergTableName, TIcebergQueryType queryType)
107+
throws AnalysisException {
108+
this.icebergTableName = icebergTableName;
109+
this.queryType = queryType;
110+
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(icebergTableName.getCtl());
111+
if (!(catalog instanceof ExternalCatalog)) {
112+
throw new AnalysisException("Catalog " + icebergTableName.getCtl() + " is not an external catalog");
113+
}
114+
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
115+
hadoopProps = externalCatalog.getCatalogProperty().getHadoopProperties();
116+
IcebergMetadataCache icebergMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
117+
Table table = icebergMetadataCache.getIcebergTable(catalog, icebergTableName.getDb(),
118+
icebergTableName.getTbl());
119+
if (table == null) {
120+
throw new AnalysisException("Iceberg table " + icebergTableName + " does not exist");
121+
}
122+
this.table = table;
123+
}
124+
107125
public TIcebergQueryType getIcebergQueryType() {
108126
return queryType;
109127
}
@@ -114,46 +132,53 @@ public TMetadataType getMetadataType() {
114132
}
115133

116134
@Override
117-
public TMetaScanRange getMetaScanRange() {
118-
TMetaScanRange metaScanRange = new TMetaScanRange();
119-
metaScanRange.setMetadataType(TMetadataType.ICEBERG);
120-
// set iceberg metadata params
121-
TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams();
122-
icebergMetadataParams.setIcebergQueryType(queryType);
123-
// TODO: remove this after iceberg metadata cache is ready
124-
icebergMetadataParams.setCatalog(icebergTableName.getCtl());
125-
icebergMetadataParams.setDatabase(icebergTableName.getDb());
126-
icebergMetadataParams.setTable(icebergTableName.getTbl());
127-
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(icebergTableName.getCtl());
128-
IcebergMetadataCache icebergMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
129-
Table table = icebergMetadataCache.getIcebergTable(catalog, icebergTableName.getDb(),
130-
icebergTableName.getTbl());
131-
String serializedTable = SerializationUtil.serializeToBase64(table);
132-
icebergMetadataParams.setSerializedTable(serializedTable);
133-
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
134-
Map<String, String> hadoopProps = externalCatalog.getCatalogProperty().getHadoopProperties();
135-
icebergMetadataParams.setHadoopProps(hadoopProps);
136-
metaScanRange.setIcebergParams(icebergMetadataParams);
137-
return metaScanRange;
135+
public List<TMetaScanRange> getMetaScanRanges() {
136+
List<TMetaScanRange> scanRanges = Lists.newArrayList();
137+
List<String> splits = getSplits();
138+
for (String split : splits) {
139+
TMetaScanRange metaScanRange = new TMetaScanRange();
140+
metaScanRange.setMetadataType(TMetadataType.ICEBERG);
141+
// set iceberg metadata params
142+
TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams();
143+
icebergMetadataParams.setIcebergQueryType(queryType);
144+
icebergMetadataParams.setCatalog(icebergTableName.getCtl());
145+
icebergMetadataParams.setDatabase(icebergTableName.getDb());
146+
icebergMetadataParams.setTable(icebergTableName.getTbl());
147+
String serializedTable = SerializationUtil.serializeToBase64(table);
148+
icebergMetadataParams.setSerializedTable(serializedTable);
149+
icebergMetadataParams.setHadoopProps(hadoopProps);
150+
icebergMetadataParams.setSerializedSplit(split);
151+
metaScanRange.setIcebergParams(icebergMetadataParams);
152+
scanRanges.add(metaScanRange);
153+
}
154+
return scanRanges;
138155
}
139156

140157
@Override
141158
public String getTableName() {
142-
return "IcebergMetadataTableValuedFunction";
159+
String queryTypeString = queryType.name().toLowerCase();
160+
return "IcebergTableValuedFunction<" + queryTypeString + ">";
143161
}
144162

145-
/**
146-
* The tvf can register columns of metadata table
147-
* The data is provided by getIcebergMetadataTable in FrontendService
148-
*
149-
* @return metadata columns
150-
* @see org.apache.doris.service.FrontendServiceImpl
151-
*/
152163
@Override
153164
public List<Column> getTableColumns() {
154-
if (queryType == TIcebergQueryType.SNAPSHOTS) {
155-
return SCHEMA_SNAPSHOT;
156-
}
157-
return Lists.newArrayList();
165+
return getSchema();
158166
}
167+
168+
/**
169+
* Get the splits for the iceberg table valued function.
170+
* This method can be overridden to provide multiple splits for the table.
171+
*
172+
* @return a list of splits
173+
*/
174+
protected List<String> getSplits() {
175+
return Lists.newArrayList("");
176+
}
177+
178+
/**
179+
* Get the schema for the iceberg table valued function.
180+
*
181+
* @return a list of columns representing the schema
182+
*/
183+
protected abstract List<Column> getSchema();
159184
}

0 commit comments

Comments
 (0)