|
40 | 40 | import com.google.common.collect.ImmutableMap;
|
41 | 41 | import com.google.common.collect.Lists;
|
42 | 42 | import com.google.common.collect.Maps;
|
43 |
| -import com.google.common.collect.Range; |
44 | 43 | import com.google.common.collect.Sets;
|
45 | 44 | import com.google.common.reflect.TypeToken;
|
46 | 45 | import com.google.gson.annotations.SerializedName;
|
47 | 46 | import com.starrocks.analysis.BinaryPredicate;
|
48 | 47 | import com.starrocks.analysis.BinaryType;
|
49 | 48 | import com.starrocks.analysis.DateLiteral;
|
50 | 49 | import com.starrocks.analysis.DecimalLiteral;
|
51 |
| -import com.starrocks.analysis.Expr; |
52 |
| -import com.starrocks.analysis.FunctionCallExpr; |
53 | 50 | import com.starrocks.analysis.InPredicate;
|
54 | 51 | import com.starrocks.analysis.IsNullPredicate;
|
55 | 52 | import com.starrocks.analysis.LiteralExpr;
|
56 | 53 | import com.starrocks.analysis.NullLiteral;
|
57 | 54 | import com.starrocks.analysis.Predicate;
|
58 | 55 | import com.starrocks.analysis.SlotRef;
|
59 |
| -import com.starrocks.analysis.StringLiteral; |
60 | 56 | import com.starrocks.catalog.Column;
|
61 | 57 | import com.starrocks.catalog.Database;
|
62 |
| -import com.starrocks.catalog.ExpressionRangePartitionInfoV2; |
63 |
| -import com.starrocks.catalog.FunctionSet; |
64 | 58 | import com.starrocks.catalog.KeysType;
|
65 |
| -import com.starrocks.catalog.ListPartitionInfo; |
66 | 59 | import com.starrocks.catalog.MaterializedIndex;
|
67 | 60 | import com.starrocks.catalog.MaterializedIndexMeta;
|
68 | 61 | import com.starrocks.catalog.OlapTable;
|
69 | 62 | import com.starrocks.catalog.Partition;
|
70 |
| -import com.starrocks.catalog.PartitionInfo; |
71 |
| -import com.starrocks.catalog.PartitionKey; |
72 |
| -import com.starrocks.catalog.PartitionType; |
73 | 63 | import com.starrocks.catalog.PhysicalPartition;
|
74 | 64 | import com.starrocks.catalog.PrimitiveType;
|
75 |
| -import com.starrocks.catalog.RangePartitionInfo; |
76 | 65 | import com.starrocks.catalog.Table;
|
77 | 66 | import com.starrocks.catalog.Type;
|
78 | 67 | import com.starrocks.common.AnalysisException;
|
|
84 | 73 | import com.starrocks.common.Pair;
|
85 | 74 | import com.starrocks.common.io.Text;
|
86 | 75 | import com.starrocks.common.io.Writable;
|
87 |
| -import com.starrocks.common.util.DateUtils; |
88 | 76 | import com.starrocks.common.util.ListComparator;
|
89 | 77 | import com.starrocks.common.util.TimeUtils;
|
90 | 78 | import com.starrocks.lake.delete.LakeDeleteJob;
|
|
95 | 83 | import com.starrocks.persist.metablock.SRMetaBlockID;
|
96 | 84 | import com.starrocks.persist.metablock.SRMetaBlockReader;
|
97 | 85 | import com.starrocks.persist.metablock.SRMetaBlockWriter;
|
98 |
| -import com.starrocks.planner.PartitionColumnFilter; |
99 |
| -import com.starrocks.planner.RangePartitionPruner; |
| 86 | +import com.starrocks.qe.ConnectContext; |
100 | 87 | import com.starrocks.qe.QueryState;
|
101 | 88 | import com.starrocks.qe.QueryStateException;
|
102 | 89 | import com.starrocks.server.GlobalStateMgr;
|
103 | 90 | import com.starrocks.service.FrontendOptions;
|
| 91 | +import com.starrocks.sql.StatementPlanner; |
| 92 | +import com.starrocks.sql.analyzer.Analyzer; |
104 | 93 | import com.starrocks.sql.analyzer.DeleteAnalyzer;
|
105 | 94 | import com.starrocks.sql.ast.DeleteStmt;
|
| 95 | +import com.starrocks.sql.ast.StatementBase; |
| 96 | +import com.starrocks.sql.optimizer.Utils; |
| 97 | +import com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator; |
| 98 | +import com.starrocks.sql.parser.SqlParser; |
| 99 | +import com.starrocks.sql.plan.ExecPlan; |
106 | 100 | import com.starrocks.transaction.BeginTransactionException;
|
107 | 101 | import com.starrocks.transaction.TransactionState;
|
108 | 102 | import com.starrocks.transaction.TransactionState.TxnCoordinator;
|
109 | 103 | import com.starrocks.transaction.TransactionState.TxnSourceType;
|
110 |
| -import org.apache.commons.lang.StringUtils; |
| 104 | +import org.apache.commons.collections4.CollectionUtils; |
| 105 | +import org.apache.commons.collections4.ListUtils; |
111 | 106 | import org.apache.logging.log4j.LogManager;
|
112 | 107 | import org.apache.logging.log4j.Logger;
|
113 | 108 |
|
|
116 | 111 | import java.io.DataOutputStream;
|
117 | 112 | import java.io.IOException;
|
118 | 113 | import java.nio.charset.StandardCharsets;
|
119 |
| -import java.time.LocalDate; |
120 |
| -import java.time.format.DateTimeFormatterBuilder; |
121 |
| -import java.time.format.ResolverStyle; |
122 |
| -import java.util.ArrayList; |
123 |
| -import java.util.Collection; |
124 | 114 | import java.util.Iterator;
|
125 | 115 | import java.util.LinkedList;
|
126 | 116 | import java.util.List;
|
@@ -241,28 +231,11 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
|
241 | 231 | Preconditions.checkState(partitionNames != null);
|
242 | 232 | boolean noPartitionSpecified = partitionNames.isEmpty();
|
243 | 233 | if (noPartitionSpecified) {
|
244 |
| - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); |
245 |
| - if (partitionInfo.isRangePartition()) { |
246 |
| - partitionNames = extractPartitionNamesByCondition(olapTable, conditions); |
247 |
| - if (partitionNames.isEmpty()) { |
248 |
| - LOG.info("The delete statement [{}] prunes all partitions", |
249 |
| - stmt.getOrigStmt().originStmt); |
250 |
| - return null; |
251 |
| - } |
252 |
| - } else if (partitionInfo.getType() == PartitionType.UNPARTITIONED) { |
253 |
| - // this is a unpartitioned table, use table name as partition name |
254 |
| - partitionNames.add(olapTable.getName()); |
255 |
| - } else if (partitionInfo.getType() == PartitionType.LIST) { |
256 |
| - // TODO: support list partition prune |
257 |
| - ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo; |
258 |
| - List<Long> partitionIds = listPartitionInfo.getPartitionIds(false); |
259 |
| - if (partitionIds.isEmpty()) { |
260 |
| - return null; |
261 |
| - } |
262 |
| - for (Long partitionId : partitionIds) { |
263 |
| - Partition partition = olapTable.getPartition(partitionId); |
264 |
| - partitionNames.add(partition.getName()); |
265 |
| - } |
| 234 | + partitionNames = partitionPruneForDelete(stmt, olapTable); |
| 235 | + |
| 236 | + if (partitionNames.isEmpty()) { |
| 237 | + LOG.info("The delete statement [{}] prunes all partitions", stmt.getOrigStmt().originStmt); |
| 238 | + return null; |
266 | 239 | }
|
267 | 240 | }
|
268 | 241 |
|
@@ -316,140 +289,41 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
|
316 | 289 | }
|
317 | 290 |
|
318 | 291 | @VisibleForTesting
|
319 |
| - public List<String> extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable) |
320 |
| - throws DdlException, AnalysisException { |
321 |
| - return extractPartitionNamesByCondition(olapTable, stmt.getDeleteConditions()); |
| 292 | + public List<String> extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable) throws DdlException { |
| 293 | + return partitionPruneForDelete(stmt, olapTable); |
322 | 294 | }
|
323 | 295 |
|
324 |
| - public List<String> extractPartitionNamesByCondition(OlapTable olapTable, List<Predicate> conditions) |
325 |
| - throws DdlException, AnalysisException { |
326 |
| - List<String> partitionNames = Lists.newArrayList(); |
327 |
| - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); |
328 |
| - RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; |
329 |
| - Map<String, PartitionColumnFilter> columnFilters = extractColumnFilter(olapTable, |
330 |
| - rangePartitionInfo, conditions); |
331 |
| - Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false); |
332 |
| - if (columnFilters.isEmpty()) { |
333 |
| - partitionNames.addAll(olapTable.getPartitionNames()); |
334 |
| - } else { |
335 |
| - RangePartitionPruner pruner = new RangePartitionPruner(keyRangeById, |
336 |
| - rangePartitionInfo.getPartitionColumns(), columnFilters); |
337 |
| - Collection<Long> selectedPartitionIds = pruner.prune(); |
338 |
| - |
339 |
| - if (selectedPartitionIds == null) { |
340 |
| - partitionNames.addAll(olapTable.getPartitionNames()); |
341 |
| - } else { |
342 |
| - for (Long partitionId : selectedPartitionIds) { |
343 |
| - Partition partition = olapTable.getPartition(partitionId); |
344 |
| - partitionNames.add(partition.getName()); |
345 |
| - } |
346 |
| - } |
347 |
| - } |
348 |
| - return partitionNames; |
349 |
| - } |
350 |
| - |
351 |
| - private Map<String, PartitionColumnFilter> extractColumnFilter(Table table, RangePartitionInfo rangePartitionInfo, |
352 |
| - List<Predicate> conditions) |
353 |
| - throws DdlException, AnalysisException { |
354 |
| - Map<String, PartitionColumnFilter> columnFilters = Maps.newHashMap(); |
355 |
| - List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns(); |
356 |
| - List<Predicate> deleteConditions = conditions; |
357 |
| - Map<String, Column> nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); |
358 |
| - for (Column column : table.getBaseSchema()) { |
359 |
| - nameToColumn.put(column.getName(), column); |
360 |
| - } |
361 |
| - for (Predicate condition : deleteConditions) { |
362 |
| - SlotRef slotRef = (SlotRef) condition.getChild(0); |
363 |
| - String columnName = slotRef.getColumnName(); |
364 |
| - |
365 |
| - // filter condition is not partition column; |
366 |
| - if (partitionColumns.stream().noneMatch(e -> e.getName().equals(columnName))) { |
367 |
| - continue; |
368 |
| - } |
369 |
| - |
370 |
| - if (!nameToColumn.containsKey(columnName)) { |
371 |
| - ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName()); |
372 |
| - } |
373 |
| - if (condition instanceof BinaryPredicate) { |
374 |
| - BinaryPredicate binaryPredicate = (BinaryPredicate) condition; |
375 |
| - LiteralExpr literalExpr = (LiteralExpr) binaryPredicate.getChild(1); |
376 |
| - Column column = nameToColumn.get(columnName); |
377 |
| - |
378 |
| - if (table.isExprPartitionTable() && !rangePartitionInfo.isAutomaticPartition()) { |
379 |
| - ExpressionRangePartitionInfoV2 expressionRangePartitionInfoV2 = |
380 |
| - (ExpressionRangePartitionInfoV2) rangePartitionInfo; |
381 |
| - Expr partitionExpr = expressionRangePartitionInfoV2.getPartitionExprs().get(0); |
382 |
| - List<FunctionCallExpr> functionCallExprs = new ArrayList<>(); |
383 |
| - partitionExpr.collect((com.google.common.base.Predicate<Expr>) arg -> arg instanceof FunctionCallExpr, |
384 |
| - functionCallExprs); |
385 |
| - FunctionCallExpr functionCallExpr = functionCallExprs.get(0); |
386 |
| - String funcName = functionCallExpr.getFnName().getFunction(); |
387 |
| - if (funcName.equalsIgnoreCase(FunctionSet.STR2DATE)) { |
388 |
| - String format = ((StringLiteral) functionCallExpr.getChild(1)).getValue(); |
389 |
| - DateTimeFormatterBuilder builder = DateUtils.unixDatetimeFormatBuilder(format, false); |
390 |
| - LocalDate ld = LocalDate.from(builder.toFormatter().withResolverStyle(ResolverStyle.STRICT).parse( |
391 |
| - StringUtils.strip(literalExpr.getStringValue(), "\r\n\t "))); |
392 |
| - literalExpr = new DateLiteral(ld.atTime(0, 0, 0), Type.DATE); |
393 |
| - } else if (funcName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME)) { |
394 |
| - literalExpr = LiteralExpr.create(DateUtils.formatTimestampInSeconds( |
395 |
| - Long.parseLong(literalExpr.getStringValue())), Type.DATETIME); |
396 |
| - } else if (funcName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME_MS)) { |
397 |
| - literalExpr = LiteralExpr.create(DateUtils.formatTimeStampInMill( |
398 |
| - Long.parseLong(literalExpr.getStringValue()), TimeUtils.getSystemTimeZone().toZoneId()), |
399 |
| - Type.DATETIME); |
400 |
| - } |
401 |
| - } else { |
402 |
| - literalExpr = LiteralExpr.create(literalExpr.getStringValue(), |
403 |
| - Objects.requireNonNull(Type.fromPrimitiveType(column.getPrimitiveType()))); |
404 |
| - } |
405 |
| - |
406 |
| - PartitionColumnFilter filter = columnFilters.getOrDefault(slotRef.getColumnName(), |
407 |
| - new PartitionColumnFilter()); |
408 |
| - switch (binaryPredicate.getOp()) { |
409 |
| - case EQ: |
410 |
| - filter.setLowerBound(literalExpr, true); |
411 |
| - filter.setUpperBound(literalExpr, true); |
412 |
| - break; |
413 |
| - case LE: |
414 |
| - filter.setUpperBound(literalExpr, true); |
415 |
| - filter.lowerBoundInclusive = true; |
416 |
| - break; |
417 |
| - case LT: |
418 |
| - filter.setUpperBound(literalExpr, false); |
419 |
| - filter.lowerBoundInclusive = true; |
420 |
| - break; |
421 |
| - case GE: |
422 |
| - filter.setLowerBound(literalExpr, true); |
423 |
| - break; |
424 |
| - case GT: |
425 |
| - filter.setLowerBound(literalExpr, false); |
426 |
| - break; |
427 |
| - default: |
428 |
| - break; |
429 |
| - } |
430 |
| - columnFilters.put(slotRef.getColumnName(), filter); |
431 |
| - } else if (condition instanceof InPredicate) { |
432 |
| - InPredicate inPredicate = (InPredicate) condition; |
433 |
| - if (inPredicate.isNotIn()) { |
434 |
| - continue; |
435 |
| - } |
436 |
| - List<LiteralExpr> list = Lists.newArrayList(); |
437 |
| - Column column = nameToColumn.get(columnName); |
438 |
| - for (int i = 1; i < inPredicate.getChildren().size(); i++) { |
439 |
| - LiteralExpr literalExpr = (LiteralExpr) inPredicate.getChild(i); |
440 |
| - literalExpr = LiteralExpr.create(literalExpr.getStringValue(), |
441 |
| - Objects.requireNonNull(Type.fromPrimitiveType(column.getPrimitiveType()))); |
442 |
| - list.add(literalExpr); |
443 |
| - } |
444 |
| - |
445 |
| - PartitionColumnFilter filter = columnFilters.getOrDefault(slotRef.getColumnName(), |
446 |
| - new PartitionColumnFilter()); |
447 |
| - filter.setInPredicateLiterals(list); |
448 |
| - columnFilters.put(slotRef.getColumnName(), filter); |
| 296 | + /** |
| 297 | + * Construct a fake sql then leverage the optimizer to prune partitions |
| 298 | + * |
| 299 | + * @return pruned partitions with delete conditions |
| 300 | + */ |
| 301 | + private List<String> partitionPruneForDelete(DeleteStmt stmt, OlapTable table) { |
| 302 | + String tableName = stmt.getTableName().toSql(); |
| 303 | + String predicate = stmt.getWherePredicate().toSql(); |
| 304 | + String fakeSql = String.format("SELECT * FROM %s WHERE %s", tableName, predicate); |
| 305 | + PhysicalOlapScanOperator physicalOlapScanOperator; |
| 306 | + try { |
| 307 | + List<StatementBase> parse = SqlParser.parse(fakeSql, ConnectContext.get().getSessionVariable()); |
| 308 | + StatementBase selectStmt = parse.get(0); |
| 309 | + Analyzer.analyze(selectStmt, ConnectContext.get()); |
| 310 | + ExecPlan plan = StatementPlanner.plan(selectStmt, ConnectContext.get()); |
| 311 | + List<PhysicalOlapScanOperator> physicalOlapScanOperators = |
| 312 | + Utils.extractPhysicalOlapScanOperator(plan.getPhysicalPlan()); |
| 313 | + // it's supposed to be empty set |
| 314 | + if (CollectionUtils.isEmpty(physicalOlapScanOperators)) { |
| 315 | + return Lists.newArrayList(); |
449 | 316 | }
|
450 |
| - |
| 317 | + physicalOlapScanOperator = physicalOlapScanOperators.get(0); |
| 318 | + } catch (Exception e) { |
| 319 | + LOG.warn("failed to do partition pruning for delete {}", stmt.toString(), e); |
| 320 | + return Lists.newArrayList(table.getVisiblePartitionNames()); |
451 | 321 | }
|
452 |
| - return columnFilters; |
| 322 | + List<Long> selectedPartitionId = physicalOlapScanOperator.getSelectedPartitionId(); |
| 323 | + return ListUtils.emptyIfNull(selectedPartitionId) |
| 324 | + .stream() |
| 325 | + .map(x -> table.getPartition(x).getName()) |
| 326 | + .collect(Collectors.toList()); |
453 | 327 | }
|
454 | 328 |
|
455 | 329 | /**
|
|
0 commit comments