Skip to content

Commit c1bc3d5

Browse files
committed
[Enhancement] Support Paimon manifest cache (backport StarRocks#55788)
Signed-off-by: Jiao Mingye <[email protected]>
1 parent 8b0b0c8 commit c1bc3d5

File tree

7 files changed

+192
-109
lines changed

7 files changed

+192
-109
lines changed

fe/fe-core/src/main/java/com/starrocks/catalog/PaimonTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public org.apache.paimon.table.Table getNativeTable() {
7878
return paimonNativeTable;
7979
}
8080

81+
// For refresh table only
82+
public void setPaimonNativeTable(org.apache.paimon.table.Table paimonNativeTable) {
83+
this.paimonNativeTable = paimonNativeTable;
84+
}
85+
8186
@Override
8287
public String getUUID() {
8388
return String.join(".", catalogName, databaseName, tableName, Long.toString(createTime));

fe/fe-core/src/main/java/com/starrocks/common/Config.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2405,6 +2405,12 @@ public class Config extends ConfigBase {
24052405
@ConfField(mutable = true)
24062406
public static long iceberg_metadata_cache_max_entry_size = 8388608L;
24072407

2408+
/**
2409+
* paimon metadata cache preheat, default false
2410+
*/
2411+
@ConfField(mutable = true)
2412+
public static boolean enable_paimon_refresh_manifest_files = false;
2413+
24082414
/**
24092415
* fe will call es api to get es index shard info every es_state_sync_interval_secs
24102416
*/

fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.starrocks.connector.hive;
1616

17+
import com.google.common.annotations.VisibleForTesting;
1718
import com.google.common.collect.Lists;
1819
import com.google.common.collect.Sets;
1920
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,6 +33,9 @@
3233
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
3334
import org.apache.logging.log4j.LogManager;
3435
import org.apache.logging.log4j.Logger;
36+
import org.apache.paimon.catalog.CachingCatalog;
37+
import org.apache.paimon.catalog.Catalog;
38+
import org.apache.paimon.catalog.Identifier;
3539

3640
import java.util.List;
3741
import java.util.Map;
@@ -41,6 +45,7 @@
4145
import java.util.concurrent.ConcurrentHashMap;
4246
import java.util.concurrent.ExecutorService;
4347
import java.util.concurrent.Executors;
48+
import java.util.concurrent.Future;
4449
import java.util.stream.Collectors;
4550

4651
public class ConnectorTableMetadataProcessor extends FrontendDaemon {
@@ -52,6 +57,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {
5257

5358
private final ExecutorService refreshRemoteFileExecutor;
5459
private final Map<String, IcebergCatalog> cachingIcebergCatalogs = new ConcurrentHashMap<>();
60+
private final Map<String, Catalog> paimonCatalogs = new ConcurrentHashMap<>();
5561

5662
public void registerTableInfo(BaseTableInfo tableInfo) {
5763
registeredTableInfos.add(tableInfo);
@@ -77,6 +83,16 @@ public void unRegisterCachingIcebergCatalog(String catalogName) {
7783
cachingIcebergCatalogs.remove(catalogName);
7884
}
7985

86+
public void registerPaimonCatalog(String catalogName, Catalog paimonCatalog) {
87+
LOG.info("register to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
88+
paimonCatalogs.put(catalogName, paimonCatalog);
89+
}
90+
91+
public void unRegisterPaimonCatalog(String catalogName) {
92+
LOG.info("unregister to caching paimon catalog on {} in the ConnectorTableMetadataProcessor", catalogName);
93+
paimonCatalogs.remove(catalogName);
94+
}
95+
8096
public ConnectorTableMetadataProcessor() {
8197
super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis);
8298
refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency,
@@ -94,6 +110,7 @@ protected void runAfterCatalogReady() {
94110
if (Config.enable_background_refresh_connector_metadata) {
95111
refreshCatalogTable();
96112
refreshIcebergCachingCatalog();
113+
refreshPaimonCatalog();
97114
}
98115
}
99116

@@ -148,6 +165,48 @@ private void refreshIcebergCachingCatalog() {
148165
}
149166
}
150167

168+
@VisibleForTesting
169+
public void refreshPaimonCatalog() {
170+
List<String> catalogNames = Lists.newArrayList(paimonCatalogs.keySet());
171+
for (String catalogName : catalogNames) {
172+
Catalog paimonCatalog = paimonCatalogs.get(catalogName);
173+
if (paimonCatalog == null) {
174+
LOG.error("Failed to get paimonCatalog by catalog {}.", catalogName);
175+
continue;
176+
}
177+
LOG.info("Start to refresh paimon catalog {}", catalogName);
178+
for (String dbName : paimonCatalog.listDatabases()) {
179+
try {
180+
for (String tblName : paimonCatalog.listTables(dbName)) {
181+
List<Future<?>> futures = Lists.newArrayList();
182+
futures.add(refreshRemoteFileExecutor.submit(() ->
183+
paimonCatalog.invalidateTable(new Identifier(dbName, tblName))
184+
));
185+
futures.add(refreshRemoteFileExecutor.submit(() ->
186+
paimonCatalog.getTable(new Identifier(dbName, tblName))
187+
));
188+
if (paimonCatalog instanceof CachingCatalog) {
189+
futures.add(refreshRemoteFileExecutor.submit(() -> {
190+
try {
191+
((CachingCatalog) paimonCatalog).refreshPartitions(new Identifier(dbName, tblName));
192+
} catch (Catalog.TableNotExistException e) {
193+
throw new RuntimeException(e);
194+
}
195+
}
196+
));
197+
}
198+
for (Future<?> future : futures) {
199+
future.get();
200+
}
201+
}
202+
} catch (Exception e) {
203+
throw new RuntimeException(e);
204+
}
205+
}
206+
LOG.info("Finish to refresh paimon catalog {}", catalogName);
207+
}
208+
}
209+
151210
private void refreshRegisteredTable() {
152211
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
153212
List<BaseTableInfo> registeredTableInfoList = Lists.newArrayList(registeredTableInfos);

fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonConnector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.starrocks.credential.aliyun.AliyunCloudCredential;
2828
import com.starrocks.credential.aws.AwsCloudConfiguration;
2929
import com.starrocks.credential.aws.AwsCloudCredential;
30+
import com.starrocks.server.GlobalStateMgr;
3031
import org.apache.hadoop.conf.Configuration;
3132
import org.apache.paimon.catalog.Catalog;
3233
import org.apache.paimon.catalog.CatalogContext;
@@ -87,6 +88,16 @@ public PaimonConnector(ConnectorContext context) {
8788
paimonOptions.setString(WAREHOUSE.key(), warehousePath);
8889
}
8990
initFsOption(cloudConfiguration);
91+
92+
// cache expire time, set to 2h
93+
this.paimonOptions.set("cache.expiration-interval", "7200s");
94+
// max num of cached partitions of a Paimon catalog
95+
this.paimonOptions.set("cache.partition.max-num", "1000");
96+
// max size of cached manifest files, 10m means cache all since files usually no more than 8m
97+
this.paimonOptions.set("cache.manifest.small-file-threshold", "10m");
98+
// max size of memory manifest cache uses
99+
this.paimonOptions.set("cache.manifest.small-file-memory", "1g");
100+
90101
String keyPrefix = "paimon.option.";
91102
Set<String> optionKeys = properties.keySet().stream().filter(k -> k.startsWith(keyPrefix)).collect(Collectors.toSet());
92103
for (String k : optionKeys) {
@@ -135,6 +146,8 @@ public Catalog getPaimonNativeCatalog() {
135146
Configuration configuration = new Configuration();
136147
hdfsEnvironment.getCloudConfiguration().applyToConfiguration(configuration);
137148
this.paimonNativeCatalog = CatalogFactory.createCatalog(CatalogContext.create(getPaimonOptions(), configuration));
149+
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
150+
.registerPaimonCatalog(catalogName, this.paimonNativeCatalog);
138151
}
139152
return paimonNativeCatalog;
140153
}
@@ -143,4 +156,9 @@ public Catalog getPaimonNativeCatalog() {
143156
public ConnectorMetadata getMetadata() {
144157
return new PaimonMetadata(catalogName, hdfsEnvironment, getPaimonNativeCatalog());
145158
}
159+
160+
@Override
161+
public void shutdown() {
162+
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterPaimonCatalog(catalogName);
163+
}
146164
}

fe/fe-core/src/main/java/com/starrocks/connector/paimon/PaimonMetadata.java

Lines changed: 67 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.starrocks.catalog.PartitionKey;
2323
import com.starrocks.catalog.Table;
2424
import com.starrocks.catalog.Type;
25+
import com.starrocks.common.Config;
2526
import com.starrocks.common.profile.Timer;
2627
import com.starrocks.common.profile.Tracers;
2728
import com.starrocks.connector.ColumnTypeConverter;
@@ -60,7 +61,6 @@
6061
import org.apache.paimon.table.source.InnerTableScan;
6162
import org.apache.paimon.table.source.ReadBuilder;
6263
import org.apache.paimon.table.source.Split;
63-
import org.apache.paimon.table.system.PartitionsTable;
6464
import org.apache.paimon.table.system.SchemasTable;
6565
import org.apache.paimon.table.system.SnapshotsTable;
6666
import org.apache.paimon.types.DataField;
@@ -69,11 +69,14 @@
6969
import org.apache.paimon.types.DateType;
7070
import org.apache.paimon.types.RowType;
7171
import org.apache.paimon.utils.DateTimeUtils;
72+
import org.apache.paimon.utils.PartitionPathUtils;
7273

7374
import java.time.LocalDateTime;
7475
import java.time.ZoneId;
7576
import java.time.ZonedDateTime;
7677
import java.util.ArrayList;
78+
import java.util.Arrays;
79+
import java.util.HashMap;
7780
import java.util.List;
7881
import java.util.Map;
7982
import java.util.Optional;
@@ -142,39 +145,21 @@ private void updatePartitionInfo(String databaseName, String tableName) {
142145
partitionColumnTypes.add(dataTableRowType.getTypeAt(dataTableRowType.getFieldIndex(partitionColumnName)));
143146
}
144147

145-
Identifier partitionTableIdentifier = new Identifier(databaseName, String.format("%s%s", tableName, "$partitions"));
146-
RecordReaderIterator<InternalRow> iterator = null;
147148
try {
148-
PartitionsTable table = (PartitionsTable) paimonNativeCatalog.getTable(partitionTableIdentifier);
149-
RowType partitionTableRowType = table.rowType();
150-
DataType lastUpdateTimeType = partitionTableRowType.getTypeAt(partitionTableRowType
151-
.getFieldIndex("last_update_time"));
152-
int[] projected = new int[] {0, 1, 2, 3, 4};
153-
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
154-
.newRead().createReader(table.newScan().plan());
155-
iterator = new RecordReaderIterator<>(recordReader);
156-
while (iterator.hasNext()) {
157-
InternalRow rowData = iterator.next();
158-
String partitionStr = rowData.getString(0).toString();
159-
org.apache.paimon.data.Timestamp lastUpdateTime = rowData.getTimestamp(4,
160-
DataTypeChecks.getPrecision(lastUpdateTimeType));
161-
String[] partitionValues = partitionStr.replace("[", "").replace("]", "")
162-
.split(",");
163-
Partition partition =
164-
getPartition(rowData.getLong(1), rowData.getLong(2), rowData.getLong(3),
165-
partitionColumnNames, partitionColumnTypes, partitionValues, lastUpdateTime);
166-
this.partitionInfos.put(partition.getPartitionName(), partition);
149+
List<org.apache.paimon.partition.Partition> partitions = paimonNativeCatalog.listPartitions(identifier);
150+
for (org.apache.paimon.partition.Partition partition : partitions) {
151+
String partitionPath = PartitionPathUtils.generatePartitionPath(partition.spec(), dataTableRowType);
152+
String[] partitionValues = Arrays.stream(partitionPath.split("/"))
153+
.map(part -> part.split("=")[1])
154+
.toArray(String[]::new);
155+
Partition srPartition = getPartition(partition.recordCount(),
156+
partition.fileSizeInBytes(), partition.fileCount(),
157+
partitionColumnNames, partitionColumnTypes, partitionValues,
158+
Timestamp.fromEpochMillis(partition.lastFileCreationTime()));
159+
this.partitionInfos.put(srPartition.getPartitionName(), srPartition);
167160
}
168-
} catch (Exception e) {
161+
} catch (Catalog.TableNotExistException e) {
169162
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
170-
} finally {
171-
if (iterator != null) {
172-
try {
173-
iterator.close();
174-
} catch (Exception e) {
175-
LOG.error("Failed to update partition info of paimon table {}.{}.", databaseName, tableName, e);
176-
}
177-
}
178163
}
179164
}
180165

@@ -184,7 +169,7 @@ private Partition getPartition(Long recordCount,
184169
List<String> partitionColumnNames,
185170
List<DataType> partitionColumnTypes,
186171
String[] partitionValues,
187-
org.apache.paimon.data.Timestamp lastUpdateTime) {
172+
Timestamp lastUpdateTime) {
188173
if (partitionValues.length != partitionColumnNames.size()) {
189174
String errorMsg = String.format("The length of partitionValues %s is not equal to " +
190175
"the partitionColumnNames %s.", partitionValues.length, partitionColumnNames.size());
@@ -206,10 +191,9 @@ private Partition getPartition(Long recordCount,
206191

207192
return new Partition(partitionName, convertToSystemDefaultTime(lastUpdateTime),
208193
recordCount, fileSizeInBytes, fileCount);
209-
210194
}
211195

212-
private Long convertToSystemDefaultTime(org.apache.paimon.data.Timestamp lastUpdateTime) {
196+
private Long convertToSystemDefaultTime(Timestamp lastUpdateTime) {
213197
LocalDateTime localDateTime = lastUpdateTime.toLocalDateTime();
214198
ZoneId zoneId = ZoneId.systemDefault();
215199
ZonedDateTime zonedDateTime = localDateTime.atZone(zoneId);
@@ -521,15 +505,14 @@ public long getTableCreateTime(String dbName, String tblName) {
521505
DataType updateTimeType = rowType.getTypeAt(rowType.getFieldIndex("update_time"));
522506
int[] projected = new int[] {0, 6};
523507
PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
524-
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0);
508+
Predicate equal = predicateBuilder.equal(predicateBuilder.indexOf("schema_id"), 0L);
525509
RecordReader<InternalRow> recordReader = table.newReadBuilder().withProjection(projected)
526510
.withFilter(equal).newRead().createReader(table.newScan().plan());
527511
iterator = new RecordReaderIterator<>(recordReader);
528512
while (iterator.hasNext()) {
529513
InternalRow rowData = iterator.next();
530514
Long schemaIdValue = rowData.getLong(0);
531-
org.apache.paimon.data.Timestamp updateTime = rowData
532-
.getTimestamp(1, DataTypeChecks.getPrecision(updateTimeType));
515+
Timestamp updateTime = rowData.getTimestamp(1, DataTypeChecks.getPrecision(updateTimeType));
533516
if (schemaIdValue == 0) {
534517
return updateTime.getMillisecond();
535518
}
@@ -565,8 +548,7 @@ public long getTableUpdateTime(String dbName, String tblName) {
565548
iterator = new RecordReaderIterator<>(recordReader);
566549
while (iterator.hasNext()) {
567550
InternalRow rowData = iterator.next();
568-
org.apache.paimon.data.Timestamp commitTime = rowData
569-
.getTimestamp(0, DataTypeChecks.getPrecision(commitTimeType));
551+
Timestamp commitTime = rowData.getTimestamp(0, DataTypeChecks.getPrecision(commitTimeType));
570552
if (convertToSystemDefaultTime(commitTime) > lastCommitTime) {
571553
lastCommitTime = convertToSystemDefaultTime(commitTime);
572554
}
@@ -610,4 +592,50 @@ public List<PartitionInfo> getPartitions(Table table, List<String> partitionName
610592
}
611593
return result;
612594
}
595+
596+
@Override
597+
public void refreshTable(String srDbName, Table table, List<String> partitionNames, boolean onlyCachedPartitions) {
598+
String tableName = table.getName();
599+
Identifier identifier = new Identifier(srDbName, tableName);
600+
paimonNativeCatalog.invalidateTable(identifier);
601+
try {
602+
((PaimonTable) table).setPaimonNativeTable(paimonNativeCatalog.getTable(identifier));
603+
if (partitionNames != null && !partitionNames.isEmpty()) {
604+
// todo: paimon does not support to refresh an exact partition
605+
this.refreshPartitionInfo(identifier);
606+
} else {
607+
this.refreshPartitionInfo(identifier);
608+
}
609+
// Preheat manifest files, disabled by default
610+
if (Config.enable_paimon_refresh_manifest_files) {
611+
if (partitionNames == null || partitionNames.isEmpty()) {
612+
((PaimonTable) table).getNativeTable().newReadBuilder().newScan().plan();
613+
} else {
614+
List<String> partitionColumnNames = table.getPartitionColumnNames();
615+
Map<String, String> partitionSpec = new HashMap<>();
616+
for (String partitionName : partitionNames) {
617+
partitionSpec.put(String.join(",", partitionColumnNames), partitionName);
618+
}
619+
((PaimonTable) table).getNativeTable().newReadBuilder()
620+
.withPartitionFilter(partitionSpec).newScan().plan();
621+
}
622+
}
623+
tables.put(identifier, table);
624+
} catch (Exception e) {
625+
LOG.error("Failed to refresh table {}.{}.{}.", catalogName, srDbName, tableName, e);
626+
}
627+
}
628+
629+
private void refreshPartitionInfo(Identifier identifier) {
630+
if (paimonNativeCatalog instanceof CachingCatalog) {
631+
try {
632+
paimonNativeCatalog.invalidateTable(identifier);
633+
((CachingCatalog) paimonNativeCatalog).refreshPartitions(identifier);
634+
} catch (Catalog.TableNotExistException e) {
635+
throw new RuntimeException(e);
636+
}
637+
} else {
638+
LOG.warn("Current catalog {} does not support cache.", catalogName);
639+
}
640+
}
613641
}

fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2458,7 +2458,7 @@ public Future<TStatus> refreshOtherFesTable(TNetworkAddress thriftAddress, Table
24582458

24592459
private boolean supportRefreshTableType(Table table) {
24602460
return table.isHiveTable() || table.isHudiTable() || table.isHiveView() || table.isIcebergTable()
2461-
|| table.isJDBCTable() || table.isDeltalakeTable();
2461+
|| table.isJDBCTable() || table.isDeltalakeTable() || table.isPaimonTable();
24622462
}
24632463

24642464
public void refreshExternalTable(TableName tableName, List<String> partitions) {

0 commit comments

Comments
 (0)