Skip to content

Commit 1dc523c

Browse files
authored
[Enhancement] Support analyze iceberg table with partition transform (#39907)
Signed-off-by: Youngwb <[email protected]>
1 parent b054af8 commit 1dc523c

File tree

6 files changed

+379
-4
lines changed

6 files changed

+379
-4
lines changed

fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ public List<Column> getPartitionColumnsIncludeTransformed() {
170170
return allPartitionColumns;
171171
}
172172

173+
public PartitionField getPartitionField(String partitionColumnName) {
174+
List<PartitionField> allPartitionFields = getNativeTable().spec().fields();
175+
Schema schema = this.getNativeTable().schema();
176+
for (PartitionField field : allPartitionFields) {
177+
if (getPartitionSourceName(schema, field).equalsIgnoreCase(partitionColumnName)) {
178+
return field;
179+
}
180+
}
181+
return null;
182+
}
183+
184+
173185
public long nextPartitionId() {
174186
return partitionIdGen.getAndIncrement();
175187
}

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergPartitionTransform.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public enum IcebergPartitionTransform {
2525
UNKNOWN;
2626

2727
public static IcebergPartitionTransform fromString(String partitionTransform) {
28+
partitionTransform = partitionTransform.replaceAll("\\[.*\\]$", "");
2829
try {
2930
return IcebergPartitionTransform.valueOf(partitionTransform.toUpperCase());
3031
} catch (IllegalArgumentException e) {

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergPartitionUtils.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
package com.starrocks.connector.iceberg;
1616

1717
import com.google.common.base.Objects;
18+
import com.google.common.base.Preconditions;
1819
import com.google.common.collect.ImmutableSet;
1920
import com.starrocks.catalog.Column;
2021
import com.starrocks.catalog.IcebergTable;
2122
import com.starrocks.catalog.Type;
2223
import com.starrocks.common.util.TimeUtils;
2324
import com.starrocks.connector.PartitionUtil;
2425
import com.starrocks.connector.exception.StarRocksConnectorException;
26+
import com.starrocks.statistic.StatisticUtils;
2527
import org.apache.iceberg.AddedRowsScanTask;
2628
import org.apache.iceberg.ChangelogOperation;
2729
import org.apache.iceberg.ChangelogScanTask;
@@ -234,4 +236,83 @@ public static PartitionUtil.DateTimeInterval getDateTimeIntervalFromIceberg(Iceb
234236
return PartitionUtil.DateTimeInterval.NONE;
235237
}
236238
}
239+
240+
public static boolean isSupportedConvertPartitionTransform(IcebergPartitionTransform transform) {
241+
return transform == IcebergPartitionTransform.IDENTITY ||
242+
transform == IcebergPartitionTransform.YEAR ||
243+
transform == IcebergPartitionTransform.MONTH ||
244+
transform == IcebergPartitionTransform.DAY ||
245+
transform == IcebergPartitionTransform.HOUR;
246+
}
247+
248+
public static LocalDateTime addDateTimeInterval(LocalDateTime dateTime, IcebergPartitionTransform transform) {
249+
switch (transform) {
250+
case YEAR:
251+
return dateTime.plusYears(1);
252+
case MONTH:
253+
return dateTime.plusMonths(1);
254+
case DAY:
255+
return dateTime.plusDays(1);
256+
case HOUR:
257+
return dateTime.plusHours(1);
258+
default:
259+
throw new StarRocksConnectorException("Unsupported partition transform to add: %s", transform);
260+
}
261+
}
262+
263+
/**
264+
convert partition value to predicate
265+
eg.
266+
partitionColumn: ts(date)
267+
partitionValue: 2023 transform: year
268+
return ts >= '2023-01-01' and ts < '2024-01-01'
269+
partitionValue: 2023-01 transform: month
270+
return ts >= '2023-01-01' and ts < '2023-02-01'
271+
partitionValue: 2023-01-01 transform: day
272+
return ts >= '2023-01-01' and ts < '2023-01-02'
273+
274+
partitionColumn: ts(datetime) transform: year
275+
partitionValue: 2023 transform: year
276+
return ts >= '2023-01-01 00:00:00' and ts < '2024-01-01 00:00:00'
277+
partitionValue: 2023-01 transform: month
278+
return ts >= '2023-01-01 00:00:00' and ts < '2023-02-01 00:00:00'
279+
partitionValue: 2023-01-01 transform: day
280+
return ts >= '2023-01-01 00:00:00' and ts < '2023-01-02 00:00:00'
281+
partitionValue: 2023-01-01-12 transform: hour
282+
return ts >= '2023-01-01 12:00:00' and ts < '2023-01-01 13:00:00'
283+
*/
284+
public static String convertPartitionFieldToPredicate(IcebergTable table, String partitionColumn,
285+
String partitionValue) {
286+
PartitionField partitionField = table.getPartitionFiled(partitionColumn);
287+
if (partitionField == null) {
288+
throw new StarRocksConnectorException("Partition column %s not found in table %s.%s.%s",
289+
partitionColumn, table.getCatalogName(), table.getRemoteDbName(), table.getRemoteTableName());
290+
}
291+
IcebergPartitionTransform transform = IcebergPartitionTransform.fromString(partitionField.transform().toString());
292+
if (transform == IcebergPartitionTransform.IDENTITY) {
293+
return StatisticUtils.quoting(partitionColumn) + " = '" + partitionValue + "'";
294+
} else {
295+
// transform is year, month, day, hour
296+
Type partitiopnColumnType = table.getColumn(partitionColumn).getType();
297+
Preconditions.checkState(partitiopnColumnType.isDateType(),
298+
"Partition column %s type must be date or datetime", partitionColumn);
299+
String normalizedPartitionValue = normalizeTimePartitionName(partitionValue, partitionField,
300+
table.getNativeTable().schema(), partitiopnColumnType);
301+
302+
LocalDateTime startDateTime = null;
303+
DateTimeFormatter dateTimeFormatter = null;
304+
if (partitiopnColumnType.isDate()) {
305+
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
306+
startDateTime = LocalDate.parse(normalizedPartitionValue, dateTimeFormatter).atStartOfDay();
307+
} else {
308+
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
309+
startDateTime = LocalDateTime.parse(normalizedPartitionValue, dateTimeFormatter);
310+
}
311+
LocalDateTime endDateTime = addDateTimeInterval(startDateTime, transform);
312+
String endDateTimeStr = endDateTime.format(dateTimeFormatter);
313+
314+
return StatisticUtils.quoting(partitionColumn) + " >= '" + normalizedPartitionValue + "' and " +
315+
StatisticUtils.quoting(partitionColumn) + " < '" + endDateTimeStr + "'";
316+
}
317+
}
237318
}

fe/fe-core/src/main/java/com/starrocks/statistic/ExternalFullStatisticsCollectJob.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@
2323
import com.starrocks.analysis.TableName;
2424
import com.starrocks.catalog.Column;
2525
import com.starrocks.catalog.Database;
26+
import com.starrocks.catalog.IcebergTable;
2627
import com.starrocks.catalog.Table;
2728
import com.starrocks.catalog.Type;
2829
import com.starrocks.common.Config;
2930
import com.starrocks.common.DdlException;
3031
import com.starrocks.common.util.DebugUtil;
3132
import com.starrocks.common.util.UUIDUtil;
3233
import com.starrocks.connector.PartitionUtil;
34+
import com.starrocks.connector.exception.StarRocksConnectorException;
3335
import com.starrocks.connector.hive.HiveMetaClient;
3436
import com.starrocks.connector.iceberg.IcebergApiConverter;
37+
import com.starrocks.connector.iceberg.IcebergPartitionTransform;
38+
import com.starrocks.connector.iceberg.IcebergPartitionUtils;
3539
import com.starrocks.qe.ConnectContext;
3640
import com.starrocks.qe.OriginStatement;
3741
import com.starrocks.qe.QueryState;
@@ -44,6 +48,7 @@
4448
import com.starrocks.thrift.TStatisticData;
4549
import org.apache.commons.lang.StringEscapeUtils;
4650
import org.apache.commons.lang.StringUtils;
51+
import org.apache.iceberg.PartitionField;
4752
import org.apache.logging.log4j.LogManager;
4853
import org.apache.logging.log4j.Logger;
4954
import org.apache.velocity.VelocityContext;
@@ -179,6 +184,9 @@ private String buildBatchCollectFullStatisticSQL(Table table, String partitionNa
179184
String partitionValue = partitionValues.get(i);
180185
if (partitionValue.equals(nullValue)) {
181186
partitionPredicate.add(StatisticUtils.quoting(partitionColumnName) + " IS NULL");
187+
} else if (isSupportedPartitionTransform(partitionColumnName)) {
188+
partitionPredicate.add(IcebergPartitionUtils.convertPartitionFieldToPredicate((IcebergTable) table,
189+
partitionColumnName, partitionValue));
182190
} else {
183191
partitionPredicate.add(StatisticUtils.quoting(partitionColumnName) + " = '" + partitionValue + "'");
184192
}
@@ -190,6 +198,31 @@ private String buildBatchCollectFullStatisticSQL(Table table, String partitionNa
190198
return builder.toString();
191199
}
192200

201+
// only iceberg table support partition transform
202+
// now only support identity/year/month/day/hour transform
203+
boolean isSupportedPartitionTransform(String partitionColumn) {
204+
// only iceberg table support partition transform
205+
if (!table.isIcebergTable()) {
206+
return false;
207+
}
208+
IcebergTable icebergTable = (IcebergTable) table;
209+
PartitionField partitionField = icebergTable.getPartitionField(partitionColumn);
210+
if (partitionField == null) {
211+
LOG.warn("Partition column {} not found in table {}", partitionColumn, table.getName());
212+
throw new StarRocksConnectorException("Partition column " + partitionColumn + " not found in table " +
213+
table.getName());
214+
}
215+
216+
IcebergPartitionTransform transform = IcebergPartitionTransform.fromString(partitionField.transform().toString());
217+
if (!IcebergPartitionUtils.isSupportedConvertPartitionTransform(transform)) {
218+
LOG.warn("Partition transform {} not supported to analyze, table: {}", transform, table.getName());
219+
throw new StarRocksConnectorException("Partition transform " + transform + " not supported to analyze, " +
220+
"table: " + table.getName());
221+
}
222+
223+
return true;
224+
}
225+
193226
@Override
194227
public void collectStatisticSync(String sql, ConnectContext context) throws Exception {
195228
LOG.debug("statistics collect sql : " + sql);

fe/fe-core/src/test/java/com/starrocks/connector/iceberg/MockIcebergMetadata.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public class MockIcebergMetadata implements ConnectorMetadata {
7777
public static final String MOCKED_PARTITIONED_DAY_TABLE_NAME = "t0_day";
7878
public static final String MOCKED_PARTITIONED_HOUR_TABLE_NAME = "t0_hour";
7979
public static final String MOCKED_PARTITIONED_BUCKET_TABLE_NAME = "t0_bucket";
80+
// partition table with transforms and partition column type is timestamp with timezone
81+
public static final String MOCKED_PARTITIONED_YEAR_TZ_TABLE_NAME = "t0_year_tz";
82+
public static final String MOCKED_PARTITIONED_MONTH_TZ_TABLE_NAME = "t0_month_tz";
83+
public static final String MOCKED_PARTITIONED_DAY_TZ_TABLE_NAME = "t0_day_tz";
84+
public static final String MOCKED_PARTITIONED_HOUR_TZ_TABLE_NAME = "t0_hour_tz";
8085

8186
private static final List<String> PARTITION_TABLE_NAMES = ImmutableList.of(MOCKED_PARTITIONED_TABLE_NAME1,
8287
MOCKED_STRING_PARTITIONED_TABLE_NAME1, MOCKED_STRING_PARTITIONED_TABLE_NAME2,
@@ -85,7 +90,9 @@ public class MockIcebergMetadata implements ConnectorMetadata {
8590
private static final List<String> PARTITION_TRANSFORM_TABLE_NAMES =
8691
ImmutableList.of(MOCKED_PARTITIONED_YEAR_TABLE_NAME, MOCKED_PARTITIONED_MONTH_TABLE_NAME,
8792
MOCKED_PARTITIONED_DAY_TABLE_NAME, MOCKED_PARTITIONED_HOUR_TABLE_NAME,
88-
MOCKED_PARTITIONED_BUCKET_TABLE_NAME);
93+
MOCKED_PARTITIONED_BUCKET_TABLE_NAME,
94+
MOCKED_PARTITIONED_YEAR_TZ_TABLE_NAME, MOCKED_PARTITIONED_MONTH_TZ_TABLE_NAME,
95+
MOCKED_PARTITIONED_DAY_TZ_TABLE_NAME, MOCKED_PARTITIONED_HOUR_TZ_TABLE_NAME);
8996

9097
private static final List<String> PARTITION_NAMES_0 = Lists.newArrayList("date=2020-01-01",
9198
"date=2020-01-02",
@@ -218,9 +225,15 @@ private static Schema getIcebergPartitionSchema(String tblName) {
218225
}
219226

220227
private static Schema getIcebergPartitionTransformSchema(String tblName) {
221-
return new Schema(required(3, "id", Types.IntegerType.get()),
222-
required(4, "data", Types.StringType.get()),
223-
required(5, "ts", Types.TimestampType.withoutZone()));
228+
if (tblName.endsWith("tz")) {
229+
return new Schema(required(3, "id", Types.IntegerType.get()),
230+
required(4, "data", Types.StringType.get()),
231+
required(5, "ts", Types.TimestampType.withZone()));
232+
} else {
233+
return new Schema(required(3, "id", Types.IntegerType.get()),
234+
required(4, "data", Types.StringType.get()),
235+
required(5, "ts", Types.TimestampType.withoutZone()));
236+
}
224237
}
225238

226239
private static TestTables.TestTable getPartitionIdentityTable(String tblName, Schema schema) throws IOException {
@@ -284,22 +297,58 @@ private static TestTables.TestTable getPartitionTransformTable(String tblName, S
284297
+ MOCKED_PARTITIONED_BUCKET_TABLE_NAME), MOCKED_PARTITIONED_BUCKET_TABLE_NAME,
285298
schema, spec, 1);
286299
}
300+
case MOCKED_PARTITIONED_YEAR_TZ_TABLE_NAME: {
301+
PartitionSpec spec =
302+
PartitionSpec.builderFor(schema).year("ts").build();
303+
return TestTables.create(
304+
new File(getStarRocksHome() + "/" + MOCKED_PARTITIONED_TRANSFORMS_DB_NAME + "/"
305+
+ MOCKED_PARTITIONED_YEAR_TZ_TABLE_NAME), MOCKED_PARTITIONED_YEAR_TZ_TABLE_NAME,
306+
schema, spec, 1);
307+
}
308+
case MOCKED_PARTITIONED_MONTH_TZ_TABLE_NAME: {
309+
PartitionSpec spec =
310+
PartitionSpec.builderFor(schema).month("ts").build();
311+
return TestTables.create(
312+
new File(getStarRocksHome() + "/" + MOCKED_PARTITIONED_TRANSFORMS_DB_NAME + "/"
313+
+ MOCKED_PARTITIONED_MONTH_TZ_TABLE_NAME), MOCKED_PARTITIONED_MONTH_TZ_TABLE_NAME,
314+
schema, spec, 1);
315+
}
316+
case MOCKED_PARTITIONED_DAY_TZ_TABLE_NAME: {
317+
PartitionSpec spec =
318+
PartitionSpec.builderFor(schema).day("ts").build();
319+
return TestTables.create(
320+
new File(getStarRocksHome() + "/" + MOCKED_PARTITIONED_TRANSFORMS_DB_NAME + "/"
321+
+ MOCKED_PARTITIONED_DAY_TZ_TABLE_NAME), MOCKED_PARTITIONED_DAY_TZ_TABLE_NAME,
322+
schema, spec, 1);
323+
}
324+
case MOCKED_PARTITIONED_HOUR_TZ_TABLE_NAME: {
325+
PartitionSpec spec =
326+
PartitionSpec.builderFor(schema).hour("ts").build();
327+
return TestTables.create(
328+
new File(getStarRocksHome() + "/" + MOCKED_PARTITIONED_TRANSFORMS_DB_NAME + "/"
329+
+ MOCKED_PARTITIONED_HOUR_TZ_TABLE_NAME), MOCKED_PARTITIONED_HOUR_TZ_TABLE_NAME,
330+
schema, spec, 1);
331+
}
287332
}
288333
return null;
289334
}
290335

291336
public static List<String> getTransformTablePartitionNames(String tblName) {
292337
switch (tblName) {
293338
case MOCKED_PARTITIONED_YEAR_TABLE_NAME:
339+
case MOCKED_PARTITIONED_YEAR_TZ_TABLE_NAME:
294340
return Lists.newArrayList("ts_year=2019", "ts_year=2020",
295341
"ts_year=2021", "ts_year=2022", "ts_year=2023");
296342
case MOCKED_PARTITIONED_MONTH_TABLE_NAME:
343+
case MOCKED_PARTITIONED_MONTH_TZ_TABLE_NAME:
297344
return Lists.newArrayList("ts_month=2022-01", "ts_month=2022-02",
298345
"ts_month=2022-03", "ts_month=2022-04", "ts_month=2022-05");
299346
case MOCKED_PARTITIONED_DAY_TABLE_NAME:
347+
case MOCKED_PARTITIONED_DAY_TZ_TABLE_NAME:
300348
return Lists.newArrayList("ts_day=2022-01-01", "ts_day=2022-01-02",
301349
"ts_day=2022-01-03", "ts_day=2022-01-04", "ts_day=2022-01-05");
302350
case MOCKED_PARTITIONED_HOUR_TABLE_NAME:
351+
case MOCKED_PARTITIONED_HOUR_TZ_TABLE_NAME:
303352
return Lists.newArrayList("ts_hour=2022-01-01-00", "ts_hour=2022-01-01-01",
304353
"ts_hour=2022-01-01-02", "ts_hour=2022-01-01-03", "ts_hour=2022-01-01-04");
305354
case MOCKED_PARTITIONED_BUCKET_TABLE_NAME:

0 commit comments

Comments
 (0)