Skip to content

Commit 264dd64

Browse files
mxdzs0612mergify[bot]
authored andcommitted
[Enhancement] Support Paimon manifest cache (#55788)
Signed-off-by: Jiao Mingye <[email protected]> (cherry picked from commit bc8e49c)
1 parent 95d6641 commit 264dd64

File tree

7 files changed

+192
-109
lines changed

7 files changed

+192
-109
lines changed

fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public org.apache.paimon.table.Table getNativeTable() {
7878
return paimonNativeTable;
7979
}
8080

81+
// For refresh table only
82+
public void setPaimonNativeTable(org.apache.paimon.table.Table paimonNativeTable) {
83+
this.paimonNativeTable = paimonNativeTable;
84+
}
85+
8186
@Override
8287
public String getUUID() {
8388
return String.join(".", catalogName, databaseName, tableName, Long.toString(createTime));

fe/fe-core/src/main/java/com/starrocks/common/Config.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,6 +2443,12 @@ public class Config extends ConfigBase {
24432443
@ConfField(mutable = true)
24442444
public static long iceberg_metadata_cache_max_entry_size = 8388608L;
24452445

2446+
/**
2447+
* paimon metadata cache preheat, default false
2448+
*/
2449+
@ConfField(mutable = true)
2450+
public static boolean enable_paimon_refresh_manifest_files = false;
2451+
24462452
/**
24472453
* fe will call es api to get es index shard info every es_state_sync_interval_secs
24482454
*/

fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.starrocks.connector.hive;
1616

17+
import com.google.common.annotations.VisibleForTesting;
1718
import com.google.common.collect.Lists;
1819
import com.google.common.collect.Sets;
1920
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,6 +33,9 @@
3233
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
3334
import org.apache.logging.log4j.LogManager;
3435
import org.apache.logging.log4j.Logger;
36+
import org.apache.paimon.catalog.CachingCatalog;
37+
import org.apache.paimon.catalog.Catalog;
38+
import org.apache.paimon.catalog.Identifier;
3539

3640
import java.util.List;
3741
import java.util.Map;
@@ -41,6 +45,7 @@
4145
import java.util.concurrent.ConcurrentHashMap;
4246
import java.util.concurrent.ExecutorService;
4347
import java.util.concurrent.Executors;
48+
import java.util.concurrent.Future;
4449
import java.util.stream.Collectors;
4550

4651
public class ConnectorTableMetadataProcessor extends FrontendDaemon {
@@ -52,6 +57,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {
5257

5358
private final ExecutorService refreshRemoteFileExecutor;
5459
private final Map<String, IcebergCatalog> cachingIcebergCatalogs = new ConcurrentHashMap<>();
60+
private final Map<String, Catalog> paimonCatalogs = new ConcurrentHashMap<>();
5561

5662
public void registerTableInfo(BaseTableInfo tableInfo) {
5763
registeredTableInfos.add(tableInfo);
@@ -77,6 +83,16 @@ public void unRegisterCachingIcebergCatalog(String catalogName) {
7783
cachingIcebergCatalogs.remove(catalogName);
7884
}
7985

86+
public void registerPaimonCatalog(String catalogName, Catalog paimonCatalog) {
87+
LOG.info("register to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
88+
paimonCatalogs.put(catalogName, paimonCatalog);
89+
}
90+
91+
public void unRegisterPaimonCatalog(String catalogName) {
92+
LOG.info("unregister to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
93+
paimonCatalogs.remove(catalogName);
94+
}
95+
8096
public ConnectorTableMetadataProcessor() {
8197
super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis);
8298
refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency,
@@ -94,6 +110,7 @@ protected void runAfterCatalogReady() {
94110
if (Config.enable_background_refresh_connector_metadata) {
95111
refreshCatalogTable();
96112
refreshIcebergCachingCatalog();
113+
refreshPaimonCatalog();
97114
}
98115
}
99116

@@ -148,6 +165,48 @@ private void refreshIcebergCachingCatalog() {
148165
}
149166
}
150167

168+
@VisibleForTesting
169+
public void refreshPaimonCatalog() {
170+
List<String> catalogNames = Lists.newArrayList(paimonCatalogs.keySet());
171+
for (String catalogName : catalogNames) {
172+
Catalog paimonCatalog = paimonCatalogs.get(catalogName);
173+
if (paimonCatalog == null) {
174+
LOG.error("Failed to get paimonCatalog by catalog {}.", catalogName);
175+
continue;
176+
}
177+
LOG.info("Start to refresh paimon catalog {}", catalogName);
178+
for (String dbName : paimonCatalog.listDatabases()) {
179+
try {
180+
for (String tblName : paimonCatalog.listTables(dbName)) {
181+
List<Future<?>> futures = Lists.newArrayList();
182+
futures.add(refreshRemoteFileExecutor.submit(() ->
183+
paimonCatalog.invalidateTable(new Identifier(dbName, tblName))
184+
));
185+
futures.add(refreshRemoteFileExecutor.submit(() ->
186+
paimonCatalog.getTable(new Identifier(dbName, tblName))
187+
));
188+
if (paimonCatalog instanceof CachingCatalog) {
189+
futures.add(refreshRemoteFileExecutor.submit(() -> {
190+
try {
191+
((CachingCatalog) paimonCatalog).refreshPartitions(new Identifier(dbName, tblName));
192+
} catch (Catalog.TableNotExistException e) {
193+
throw new RuntimeException(e);
194+
}
195+
}
196+
));
197+
}
198+
for (Future<?> future : futures) {
199+
future.get();
200+
}
201+
}
202+
} catch (Exception e) {
203+
throw new RuntimeException(e);
204+
}
205+
}
206+
LOG.info("Finish to refresh paimon catalog {}", catalogName);
207+
}
208+
}
209+
151210
private void refreshRegisteredTable() {
152211
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
153212
List<BaseTableInfo> registeredTableInfoList = Lists.newArrayList(registeredTableInfos);

fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.starrocks.credential.aliyun.AliyunCloudCredential;
3030
import com.starrocks.credential.aws.AwsCloudConfiguration;
3131
import com.starrocks.credential.aws.AwsCloudCredential;
32+
import com.starrocks.server.GlobalStateMgr;
3233
import org.apache.hadoop.conf.Configuration;
3334
import org.apache.paimon.catalog.Catalog;
3435
import org.apache.paimon.catalog.CatalogContext;
@@ -91,6 +92,16 @@ public PaimonConnector(ConnectorContext context) {
9192
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
9293
}
9394
initFsOption(cloudConfiguration);
95+
96+
// cache expire time, set to 2h
97+
this.paimonOptions.set("cache.expiration-interval", "7200s");
98+
// max num of cached partitions of a Paimon catalog
99+
this.paimonOptions.set("cache.partition.max-num", "1000");
100+
// max size of cached manifest files, 10m means cache all since files usually no more than 8m
101+
this.paimonOptions.set("cache.manifest.small-file-threshold", "10m");
102+
// max size of memory manifest cache uses
103+
this.paimonOptions.set("cache.manifest.small-file-memory", "1g");
104+
94105
String keyPrefix = "paimon.option.";
95106
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
96107
for (String k : optionKeys) {
@@ -139,6 +150,8 @@ public Catalog getPaimonNativeCatalog() {
139150
Configuration configuration = new Configuration();
140151
hdfsEnvironment.getCloudConfiguration().applyToConfiguration(configuration);
141152
this.paimonNativeCatalog = CatalogFactory.createCatalog(CatalogContext.create(getPaimonOptions(), configuration));
153+
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
154+
.registerPaimonCatalog(catalogName, this.paimonNativeCatalog);
142155
}
143156
return paimonNativeCatalog;
144157
}
@@ -147,4 +160,9 @@ public Catalog getPaimonNativeCatalog() {
147160
public ConnectorMetadata getMetadata() {
148161
return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog(), connectorProperties);
149162
}
163+
164+
@Override
165+
public void shutdown() {
166+
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterPaimonCatalog(catalogName);
167+
}
150168
}

fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java

Lines changed: 67 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.starrocks.catalog.PartitionKey;
2323
import com.starrocks.catalog.Table;
2424
import com.starrocks.catalog.Type;
25+
import com.starrocks.common.Config;
2526
import com.starrocks.common.profile.Timer;
2627
import com.starrocks.common.profile.Tracers;
2728
import com.starrocks.connector.ColumnTypeConverter;
@@ -65,7 +66,6 @@
6566
import org.apache.paimon.table.source.InnerTableScan;
6667
import org.apache.paimon.table.source.ReadBuilder;
6768
import org.apache.paimon.table.source.Split;
68-
import org.apache.paimon.table.system.PartitionsTable;
6969
import org.apache.paimon.table.system.SchemasTable;
7070
import org.apache.paimon.table.system.SnapshotsTable;
7171
import org.apache.paimon.types.DataField;
@@ -74,11 +74,14 @@
7474
import org.apache.paimon.types.DateType;
7575
import org.apache.paimon.types.RowType;
7676
import org.apache.paimon.utils.DateTimeUtils;
77+
import org.apache.paimon.utils.PartitionPathUtils;
7778

7879
import java.time.LocalDateTime;
7980
import java.time.ZoneId;
8081
import java.time.ZonedDateTime;
8182
import java.util.ArrayList;
83+
import java.util.Arrays;
84+
import java.util.HashMap;
8285
import java.util.List;
8386
import java.util.Map;
8487
import java.util.Optional;
@@ -150,39 +153,21 @@ private void updatePartitionInfo(String databaseName, String tableName) {
150153
partitionColumnTypes.add(dataTableRowType.getTypeAt(dataTableRowType.getFieldIndex(partitionColumnName)));
151154
}
152155

153-
Identifier partitionTableIdentifier = new Identifier(databaseName, String.format("%s%s", tableName, "$partitions"));
154-
RecordReaderIterator<InternalRow> iterator = null;
155156
try {
156-
PartitionsTable table = (PartitionsTable) paimonNativeCatalog.getTable(partitionTableIdentifier);
157-
RowType partitionTableRowType = table.rowType();
158-
DataType lastUpdateTimeType = partitionTableRowType.getTypeAt(partitionTableRowType
159-
.getFieldIndex("last_update_time"));
160-
int[] projected = new int[] {0, 1, 2, 3, 4};
161-
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
162-
.newRead().createReader(table.newScan().plan());
163-
iterator = new RecordReaderIterator<>(recordReader);
164-
while (iterator.hasNext()) {
165-
InternalRow rowData = iterator.next();
166-
String partitionStr = rowData.getString(0).toString();
167-
org.apache.paimon.data.Timestamp lastUpdateTime = rowData.getTimestamp(4,
168-
DataTypeChecks.getPrecision(lastUpdateTimeType));
169-
String[] partitionValues = partitionStr.replace("[", "").replace("]", "")
170-
.split(",");
171-
Partition partition =
172-
getPartition(rowData.getLong(1), rowData.getLong(2), rowData.getLong(3),
173-
partitionColumnNames, partitionColumnTypes, partitionValues, lastUpdateTime);
174-
this.partitionInfos.put(partition.getPartitionName(), partition);
157+
List<org.apache.paimon.partition.Partition> partitions = paimonNativeCatalog.listPartitions(identifier);
158+
for (org.apache.paimon.partition.Partition partition : partitions) {
159+
String partitionPath = PartitionPathUtils.generatePartitionPath(partition.spec(), dataTableRowType);
160+
String[] partitionValues = Arrays.stream(partitionPath.split("/"))
161+
.map(part -> part.split("=")[1])
162+
.toArray(String[]::new);
163+
Partition srPartition = getPartition(partition.recordCount(),
164+
partition.fileSizeInBytes(), partition.fileCount(),
165+
partitionColumnNames, partitionColumnTypes, partitionValues,
166+
Timestamp.fromEpochMillis(partition.lastFileCreationTime()));
167+
this.partitionInfos.put(srPartition.getPartitionName(), srPartition);
175168
}
176-
} catch (Exception e) {
169+
} catch (Catalog.TableNotExistException e) {
177170
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
178-
} finally {
179-
if (iterator != null) {
180-
try {
181-
iterator.close();
182-
} catch (Exception e) {
183-
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
184-
}
185-
}
186171
}
187172
}
188173

@@ -192,7 +177,7 @@ private Partition getPartition(Long recordCount,
192177
List<String> partitionColumnNames,
193178
List<DataType> partitionColumnTypes,
194179
String[] partitionValues,
195-
org.apache.paimon.data.Timestamp lastUpdateTime) {
180+
Timestamp lastUpdateTime) {
196181
if (partitionValues.length != partitionColumnNames.size()) {
197182
String errorMsg = String.format("The length of partitionValues %s is not equal to " +
198183
"the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size());
@@ -214,10 +199,9 @@ private Partition getPartition(Long recordCount,
214199

215200
return new Partition(partitionName, convertToSystemDefaultTime(lastUpdateTime),
216201
recordCount, fileSizeInBytes, fileCount);
217-
218202
}
219203

220-
private Long convertToSystemDefaultTime(org.apache.paimon.data.Timestamp lastUpdateTime) {
204+
private Long convertToSystemDefaultTime(Timestamp lastUpdateTime) {
221205
LocalDateTime localDateTime = lastUpdateTime.toLocalDateTime();
222206
ZoneId zoneId = ZoneId.systemDefault();
223207
ZonedDateTime zonedDateTime = localDateTime.atZone(zoneId);
@@ -533,15 +517,14 @@ public long getTableCreateTime(String dbName, String tblName) {
533517
DataType updateTimeType = rowType.getTypeAt(rowType.getFieldIndex("update_time"));
534518
int[] projected = new int[] {0, 6};
535519
PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
536-
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0);
520+
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0L);
537521
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
538522
.withFilter(equal).newRead().createReader(table.newScan().plan());
539523
iterator = new RecordReaderIterator<>(recordReader);
540524
while (iterator.hasNext()) {
541525
InternalRow rowData = iterator.next();
542526
Long schemaIdValue = rowData.getLong(0);
543-
org.apache.paimon.data.Timestamp updateTime = rowData
544-
.getTimestamp(1, DataTypeChecks.getPrecision(updateTimeType));
527+
Timestamp updateTime = rowData.getTimestamp(1, DataTypeChecks.getPrecision(updateTimeType));
545528
if (schemaIdValue == 0) {
546529
return updateTime.getMillisecond();
547530
}
@@ -577,8 +560,7 @@ public long getTableUpdateTime(String dbName, String tblName) {
577560
iterator = new RecordReaderIterator<>(recordReader);
578561
while (iterator.hasNext()) {
579562
InternalRow rowData = iterator.next();
580-
org.apache.paimon.data.Timestamp commitTime = rowData
581-
.getTimestamp(0, DataTypeChecks.getPrecision(commitTimeType));
563+
Timestamp commitTime = rowData.getTimestamp(0, DataTypeChecks.getPrecision(commitTimeType));
582564
if (convertToSystemDefaultTime(commitTime) > lastCommitTime) {
583565
lastCommitTime = convertToSystemDefaultTime(commitTime);
584566
}
@@ -623,4 +605,50 @@ public List<PartitionInfo> getPartitions(Table table, List<String> partitionName
623605
}
624606
return result;
625607
}
608+
609+
@Override
610+
public void refreshTable(String srDbName, Table table, List<String> partitionNames, boolean onlyCachedPartitions) {
611+
String tableName = table.getCatalogTableName();
612+
Identifier identifier = new Identifier(srDbName, tableName);
613+
paimonNativeCatalog.invalidateTable(identifier);
614+
try {
615+
((PaimonTable) table).setPaimonNativeTable(paimonNativeCatalog.getTable(identifier));
616+
if (partitionNames != null && !partitionNames.isEmpty()) {
617+
// todo: paimon does not support to refresh an exact partition
618+
this.refreshPartitionInfo(identifier);
619+
} else {
620+
this.refreshPartitionInfo(identifier);
621+
}
622+
// Preheat manifest files, disabled by default
623+
if (Config.enable_paimon_refresh_manifest_files) {
624+
if (partitionNames == null || partitionNames.isEmpty()) {
625+
((PaimonTable) table).getNativeTable().newReadBuilder().newScan().plan();
626+
} else {
627+
List<String> partitionColumnNames = table.getPartitionColumnNames();
628+
Map<String, String> partitionSpec = new HashMap<>();
629+
for (String partitionName : partitionNames) {
630+
partitionSpec.put(String.join(",", partitionColumnNames), partitionName);
631+
}
632+
((PaimonTable) table).getNativeTable().newReadBuilder()
633+
.withPartitionFilter(partitionSpec).newScan().plan();
634+
}
635+
}
636+
tables.put(identifier, table);
637+
} catch (Exception e) {
638+
LOG.error("Failed to refresh table {}.{}.{}.", catalogName, srDbName, tableName, e);
639+
}
640+
}
641+
642+
private void refreshPartitionInfo(Identifier identifier) {
643+
if (paimonNativeCatalog instanceof CachingCatalog) {
644+
try {
645+
paimonNativeCatalog.invalidateTable(identifier);
646+
((CachingCatalog) paimonNativeCatalog).refreshPartitions(identifier);
647+
} catch (Catalog.TableNotExistException e) {
648+
throw new RuntimeException(e);
649+
}
650+
} else {
651+
LOG.warn("Current catalog {} does not support cache.", catalogName);
652+
}
653+
}
626654
}

fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2474,7 +2474,7 @@ public Future<TStatus> refreshOtherFesTable(TNetworkAddress thriftAddress, Table
24742474

24752475
private boolean supportRefreshTableType(Table table) {
24762476
return table.isHiveTable() || table.isHudiTable() || table.isHiveView() || table.isIcebergTable()
2477-
|| table.isJDBCTable() || table.isDeltalakeTable();
2477+
|| table.isJDBCTable() || table.isDeltalakeTable() || table.isPaimonTable();
24782478
}
24792479

24802480
public void refreshExternalTable(TableName tableName, List<String> partitions) {

0 commit comments

Comments
 (0)