Skip to content

Commit 22cb578

Browse files
committed
first version
Signed-off-by: Jiao Mingye <[email protected]>
1 parent bb97981 commit 22cb578

File tree

6 files changed

+171
-48
lines changed

6 files changed

+171
-48
lines changed

fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public org.apache.paimon.table.Table getNativeTable() {
7878
return paimonNativeTable;
7979
}
8080

81+
// For refresh table only
82+
public void setPaimonNativeTable(org.apache.paimon.table.Table paimonNativeTable) {
83+
this.paimonNativeTable = paimonNativeTable;
84+
}
85+
8186
@Override
8287
public String getUUID() {
8388
return String.join(".", catalogName, databaseName, tableName, Long.toString(createTime));

fe/fe-core/src/main/java/com/starrocks/common/Config.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2469,6 +2469,12 @@ public class Config extends ConfigBase {
24692469
@ConfField(mutable = true)
24702470
public static long iceberg_metadata_cache_max_entry_size = 8388608L;
24712471

2472+
/**
2473+
* paimon metadata cache preheat, default false
2474+
*/
2475+
@ConfField(mutable = true)
2476+
public static boolean enable_paimon_refresh_manifest_files = false;
2477+
24722478
/**
24732479
* fe will call es api to get es index shard info every es_state_sync_interval_secs
24742480
*/

fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,19 @@
3232
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
3333
import org.apache.logging.log4j.LogManager;
3434
import org.apache.logging.log4j.Logger;
35+
import org.apache.paimon.catalog.Catalog;
36+
import org.apache.paimon.catalog.Identifier;
3537

3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.Objects;
3941
import java.util.Optional;
4042
import java.util.Set;
4143
import java.util.concurrent.ConcurrentHashMap;
44+
import java.util.concurrent.ExecutionException;
4245
import java.util.concurrent.ExecutorService;
4346
import java.util.concurrent.Executors;
47+
import java.util.concurrent.Future;
4448
import java.util.stream.Collectors;
4549

4650
public class ConnectorTableMetadataProcessor extends FrontendDaemon {
@@ -53,6 +57,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {
5357

5458
private final ExecutorService refreshRemoteFileExecutor;
5559
private final Map<String, IcebergCatalog> cachingIcebergCatalogs = new ConcurrentHashMap<>();
60+
private final Map<String, Catalog> paimonCatalogs = new ConcurrentHashMap<>();
5661

5762
public void registerTableInfo(BaseTableInfo tableInfo) {
5863
registeredTableInfos.add(tableInfo);
@@ -80,6 +85,16 @@ public void unRegisterCachingIcebergCatalog(String catalogName) {
8085
cachingIcebergCatalogs.remove(catalogName);
8186
}
8287

88+
public void registerPaimonCatalog(String catalogName, Catalog paimonCatalog) {
89+
LOG.info("register to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
90+
paimonCatalogs.put(catalogName, paimonCatalog);
91+
}
92+
93+
public void unRegisterPaimonCatalog(String catalogName) {
94+
LOG.info("unregister to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
95+
paimonCatalogs.remove(catalogName);
96+
}
97+
8398
public ConnectorTableMetadataProcessor() {
8499
super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis);
85100
refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency,
@@ -97,6 +112,7 @@ protected void runAfterCatalogReady() {
97112
if (Config.enable_background_refresh_connector_metadata) {
98113
refreshCatalogTable();
99114
refreshIcebergCachingCatalog();
115+
refreshPaimonCatalog();
100116
}
101117
}
102118

@@ -153,6 +169,33 @@ private void refreshIcebergCachingCatalog() {
153169
}
154170
}
155171

172+
private void refreshPaimonCatalog() {
173+
List<String> catalogNames = Lists.newArrayList(paimonCatalogs.keySet());
174+
for (String catalogName : catalogNames) {
175+
Catalog paimonCatalog = paimonCatalogs.get(catalogName);
176+
if (paimonCatalog == null) {
177+
LOG.error("Failed to get paimonCatalog by catalog {}.", catalogName);
178+
continue;
179+
}
180+
LOG.info("Start to refresh paimon catalog {}", catalogName);
181+
for (String dbName : paimonCatalog.listDatabases()) {
182+
try {
183+
for (String tblName : paimonCatalog.listTables(dbName)) {
184+
List<Future<?>> futures = Lists.newArrayList();
185+
futures.add(refreshRemoteFileExecutor.submit(()
186+
-> paimonCatalog.getTable(new Identifier(dbName, tblName))));
187+
for (Future<?> future : futures) {
188+
future.get();
189+
}
190+
}
191+
} catch (Catalog.DatabaseNotExistException | ExecutionException | InterruptedException e) {
192+
throw new RuntimeException(e);
193+
}
194+
}
195+
LOG.info("Finish to refresh paimon catalog {}", catalogName);
196+
}
197+
}
198+
156199
private void refreshRegisteredTable() {
157200
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
158201
List<BaseTableInfo> registeredTableInfoList = Lists.newArrayList(registeredTableInfos);

fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.starrocks.credential.aliyun.AliyunCloudCredential;
3030
import com.starrocks.credential.aws.AwsCloudConfiguration;
3131
import com.starrocks.credential.aws.AwsCloudCredential;
32+
import com.starrocks.server.GlobalStateMgr;
3233
import org.apache.hadoop.conf.Configuration;
3334
import org.apache.paimon.catalog.Catalog;
3435
import org.apache.paimon.catalog.CatalogContext;
@@ -91,6 +92,16 @@ public PaimonConnector(ConnectorContext context) {
9192
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
9293
}
9394
initFsOption(cloudConfiguration);
95+
96+
// cache expire time, set to 2h
97+
this.paimonOptions.set("cache.expiration-interval", "7200s");
98+
// max num of cached partitions of a Paimon catalog
99+
this.paimonOptions.set("cache.partition.max-num", "1000");
100+
// max size of cached manifest files, 10m means cache all since files usually no more than 8m
101+
this.paimonOptions.set("cache.manifest.small-file-threshold", "10m");
102+
// max size of memory manifest cache uses
103+
this.paimonOptions.set("cache.manifest.small-file-memory", "1g");
104+
94105
String keyPrefix = "paimon.option.";
95106
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
96107
for (String k : optionKeys) {
@@ -139,6 +150,8 @@ public Catalog getPaimonNativeCatalog() {
139150
Configuration configuration = new Configuration();
140151
hdfsEnvironment.getCloudConfiguration().applyToConfiguration(configuration);
141152
this.paimonNativeCatalog = CatalogFactory.createCatalog(CatalogContext.create(getPaimonOptions(), configuration));
153+
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
154+
.registerPaimonCatalog(catalogName, this.paimonNativeCatalog);
142155
}
143156
return paimonNativeCatalog;
144157
}
@@ -147,4 +160,9 @@ public Catalog getPaimonNativeCatalog() {
147160
public ConnectorMetadata getMetadata() {
148161
return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog(), connectorProperties);
149162
}
163+
164+
@Override
165+
public void shutdown() {
166+
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterPaimonCatalog(catalogName);
167+
}
150168
}

fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java

Lines changed: 98 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.starrocks.catalog.PartitionKey;
2323
import com.starrocks.catalog.Table;
2424
import com.starrocks.catalog.Type;
25+
import com.starrocks.common.Config;
2526
import com.starrocks.connector.ColumnTypeConverter;
2627
import com.starrocks.connector.ConnectorMetadatRequestContext;
2728
import com.starrocks.connector.ConnectorMetadata;
@@ -45,16 +46,17 @@
4546
import com.starrocks.sql.optimizer.statistics.Statistics;
4647
import org.apache.logging.log4j.LogManager;
4748
import org.apache.logging.log4j.Logger;
49+
import org.apache.paimon.catalog.CachingCatalog;
4850
import org.apache.paimon.catalog.Catalog;
4951
import org.apache.paimon.catalog.Identifier;
5052
import org.apache.paimon.data.InternalRow;
53+
import org.apache.paimon.data.Timestamp;
5154
import org.apache.paimon.predicate.Predicate;
5255
import org.apache.paimon.predicate.PredicateBuilder;
5356
import org.apache.paimon.reader.RecordReader;
5457
import org.apache.paimon.reader.RecordReaderIterator;
5558
import org.apache.paimon.table.source.ReadBuilder;
5659
import org.apache.paimon.table.source.Split;
57-
import org.apache.paimon.table.system.PartitionsTable;
5860
import org.apache.paimon.table.system.SchemasTable;
5961
import org.apache.paimon.table.system.SnapshotsTable;
6062
import org.apache.paimon.types.DataField;
@@ -63,8 +65,11 @@
6365
import org.apache.paimon.types.DateType;
6466
import org.apache.paimon.types.RowType;
6567
import org.apache.paimon.utils.DateTimeUtils;
68+
import org.apache.paimon.utils.PartitionPathUtils;
6669

6770
import java.util.ArrayList;
71+
import java.util.Arrays;
72+
import java.util.HashMap;
6873
import java.util.List;
6974
import java.util.Map;
7075
import java.util.concurrent.ConcurrentHashMap;
@@ -80,7 +85,7 @@ public class PaimonMetadata implements ConnectorMetadata {
8085
private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();
8186
private final Map<String, Database> databases = new ConcurrentHashMap<>();
8287
private final Map<PredicateSearchKey, PaimonSplitsInfo> paimonSplits = new ConcurrentHashMap<>();
83-
private final Map<String, Long> partitionInfos = new ConcurrentHashMap<>();
88+
private final Map<String, Partition> partitionInfos = new ConcurrentHashMap<>();
8489
private final ConnectorProperties properties;
8590

8691
public PaimonMetadata(String catalogName, HdfsEnvironment hdfsEnvironment, Catalog paimonNativeCatalog,
@@ -130,54 +135,54 @@ private void updatePartitionInfo(String databaseName, String tableName) {
130135
partitionColumnTypes.add(dataTableRowType.getTypeAt(dataTableRowType.getFieldIndex(partitionColumnName)));
131136
}
132137

133-
Identifier partitionTableIdentifier = new Identifier(databaseName, String.format("%s%s", tableName, "$partitions"));
134-
RecordReaderIterator<InternalRow> iterator = null;
135138
try {
136-
PartitionsTable table = (PartitionsTable) paimonNativeCatalog.getTable(partitionTableIdentifier);
137-
RowType partitionTableRowType = table.rowType();
138-
DataType lastUpdateTimeType = partitionTableRowType.getTypeAt(partitionTableRowType
139-
.getFieldIndex("last_update_time"));
140-
int[] projected = new int[] {0, 4};
141-
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
142-
.newRead().createReader(table.newScan().plan());
143-
iterator = new RecordReaderIterator<>(recordReader);
144-
while (iterator.hasNext()) {
145-
InternalRow rowData = iterator.next();
146-
String partition = rowData.getString(0).toString();
147-
org.apache.paimon.data.Timestamp lastUpdateTime = rowData.getTimestamp(1,
148-
DataTypeChecks.getPrecision(lastUpdateTimeType));
149-
String[] partitionValues = partition.replace("[", "").replace("]", "")
150-
.split(",");
151-
if (partitionValues.length != partitionColumnNames.size()) {
152-
String errorMsg = String.format("The length of partitionValues %s is not equal to " +
153-
"the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size());
154-
throw new IllegalArgumentException(errorMsg);
155-
}
156-
StringBuilder sb = new StringBuilder();
157-
for (int i = 0; i < partitionValues.length; i++) {
158-
String column = partitionColumnNames.get(i);
159-
String value = partitionValues[i].trim();
160-
if (partitionColumnTypes.get(i) instanceof DateType) {
161-
value = DateTimeUtils.formatDate(Integer.parseInt(value));
162-
}
163-
sb.append(column).append("=").append(value);
164-
sb.append("/");
165-
}
166-
sb.deleteCharAt(sb.length() - 1);
167-
String partitionName = sb.toString();
168-
this.partitionInfos.put(partitionName, lastUpdateTime.getMillisecond());
139+
List<org.apache.paimon.partition.Partition> partitions = paimonNativeCatalog.listPartitions(identifier);
140+
for (org.apache.paimon.partition.Partition partition : partitions) {
141+
String partitionPath = PartitionPathUtils
142+
.generatePartitionPath(partition.spec(), dataTableRowType);
143+
String[] partitionValues = Arrays.stream(partitionPath.split("/"))
144+
.map(part -> part.split("=")[1])
145+
.toArray(String[]::new);
146+
Partition srPartition = getPartition(partition.recordCount(),
147+
partition.fileSizeInBytes(), partition.fileCount(),
148+
partitionColumnNames, partitionColumnTypes, partitionValues,
149+
Timestamp.fromEpochMillis(partition.lastFileCreationTime()));
150+
this.partitionInfos.put(srPartition.getPartitionName(), srPartition);
169151
}
170-
} catch (Exception e) {
152+
return;
153+
} catch (Catalog.TableNotExistException e) {
171154
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
172-
} finally {
173-
if (iterator != null) {
174-
try {
175-
iterator.close();
176-
} catch (Exception e) {
177-
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
178-
}
155+
}
156+
}
157+
158+
private Partition getPartition(Long recordCount,
159+
Long fileSizeInBytes,
160+
Long fileCount,
161+
List<String> partitionColumnNames,
162+
List<DataType> partitionColumnTypes,
163+
String[] partitionValues,
164+
org.apache.paimon.data.Timestamp lastUpdateTime) {
165+
if (partitionValues.length != partitionColumnNames.size()) {
166+
String errorMsg = String.format("The length of partitionValues %s is not equal to " +
167+
"the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size());
168+
throw new IllegalArgumentException(errorMsg);
169+
}
170+
171+
StringBuilder sb = new StringBuilder();
172+
for (int i = 0; i < partitionValues.length; i++) {
173+
String column = partitionColumnNames.get(i);
174+
String value = partitionValues[i].trim();
175+
if (partitionColumnTypes.get(i) instanceof DateType) {
176+
value = DateTimeUtils.formatDate(Integer.parseInt(value));
179177
}
178+
sb.append(column).append("=").append(value);
179+
sb.append("/");
180180
}
181+
sb.deleteCharAt(sb.length() - 1);
182+
String partitionName = sb.toString();
183+
184+
return new Partition(partitionName, fileCount);
185+
181186
}
182187

183188
@Override
@@ -348,7 +353,7 @@ public long getTableCreateTime(String dbName, String tblName) {
348353
DataType updateTimeType = rowType.getTypeAt(rowType.getFieldIndex("update_time"));
349354
int[] projected = new int[] {0, 6};
350355
PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
351-
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0);
356+
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0L);
352357
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
353358
.withFilter(equal).newRead().createReader(table.newScan().plan());
354359
iterator = new RecordReaderIterator<>(recordReader);
@@ -429,11 +434,57 @@ public List<PartitionInfo> getPartitions(Table table, List<String> partitionName
429434
this.updatePartitionInfo(paimonTable.getCatalogDBName(), paimonTable.getCatalogTableName());
430435
}
431436
if (this.partitionInfos.get(partitionName) != null) {
432-
result.add(new Partition(partitionName, this.partitionInfos.get(partitionName)));
437+
result.add(this.partitionInfos.get(partitionName));
433438
} else {
434439
LOG.warn("Cannot find the paimon partition info: {}", partitionName);
435440
}
436441
}
437442
return result;
438443
}
444+
445+
@Override
446+
public void refreshTable(String srDbName, Table table, List<String> partitionNames, boolean onlyCachedPartitions) {
447+
String tableName = table.getCatalogTableName();
448+
Identifier identifier = new Identifier(srDbName, tableName);
449+
paimonNativeCatalog.invalidateTable(identifier);
450+
try {
451+
((PaimonTable) table).setPaimonNativeTable(paimonNativeCatalog.getTable(identifier));
452+
if (partitionNames != null && !partitionNames.isEmpty()) {
453+
// todo: do not support refresh an exact partition
454+
this.refreshPartitionInfo(identifier);
455+
} else {
456+
this.refreshPartitionInfo(identifier);
457+
}
458+
// Preheat manifest files, disabled by default
459+
if (Config.enable_paimon_refresh_manifest_files) {
460+
if (partitionNames == null || partitionNames.isEmpty()) {
461+
((PaimonTable) table).getNativeTable().newReadBuilder().newScan().plan();
462+
} else {
463+
List<String> partitionColumnNames = table.getPartitionColumnNames();
464+
Map<String, String> partitionSpec = new HashMap<>();
465+
for (String partitionName : partitionNames) {
466+
partitionSpec.put(String.join(",", partitionColumnNames), partitionName);
467+
}
468+
((PaimonTable) table).getNativeTable().newReadBuilder()
469+
.withPartitionFilter(partitionSpec).newScan().plan();
470+
}
471+
}
472+
tables.put(identifier, table);
473+
} catch (Exception e) {
474+
LOG.error("Failed to refresh table {}.{}.{}.", catalogName, srDbName, tableName, e);
475+
}
476+
}
477+
478+
private void refreshPartitionInfo(Identifier identifier) {
479+
if (paimonNativeCatalog instanceof CachingCatalog) {
480+
try {
481+
paimonNativeCatalog.invalidateTable(identifier);
482+
((CachingCatalog) paimonNativeCatalog).refreshPartitions(identifier);
483+
} catch (Catalog.TableNotExistException e) {
484+
throw new RuntimeException(e);
485+
}
486+
} else {
487+
LOG.warn("Current catalog {} does not support cache.", catalogName);
488+
}
489+
}
439490
}

fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2504,7 +2504,7 @@ public Future<TStatus> refreshOtherFesTable(TNetworkAddress thriftAddress, Table
25042504

25052505
private boolean supportRefreshTableType(Table table) {
25062506
return table.isHiveTable() || table.isHudiTable() || table.isHiveView() || table.isIcebergTable()
2507-
|| table.isJDBCTable() || table.isDeltalakeTable();
2507+
|| table.isJDBCTable() || table.isDeltalakeTable() || table.isPaimonTable();
25082508
}
25092509

25102510
public void refreshExternalTable(TableName tableName, List<String> partitions) {

0 commit comments

Comments
 (0)