Skip to content

Commit b43e28a

Browse files
[BugFix] AggTable's metric type not match corresponding aggregate function's return type (backport #58407) (#58418)
Signed-off-by: satanson <[email protected]> Co-authored-by: satanson <[email protected]>
1 parent 4a11423 commit b43e28a

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/RemoveAggregationFromAggTable.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@
3333
import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator;
3434
import com.starrocks.sql.optimizer.operator.pattern.Pattern;
3535
import com.starrocks.sql.optimizer.operator.scalar.CallOperator;
36+
import com.starrocks.sql.optimizer.operator.scalar.CastOperator;
3637
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
3738
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
3839
import com.starrocks.sql.optimizer.rewrite.ReplaceColumnRefRewriter;
40+
import com.starrocks.sql.optimizer.rewrite.ScalarOperatorRewriter;
41+
import com.starrocks.sql.optimizer.rewrite.scalar.ReduceCastRule;
3942
import com.starrocks.sql.optimizer.rule.RuleType;
4043

4144
import java.util.List;
@@ -138,7 +141,23 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
138141
aggregationOperator.getGroupingKeys().forEach(g -> projectMap.put(g, g));
139142
for (Map.Entry<ColumnRefOperator, CallOperator> entry : aggregationOperator.getAggregations().entrySet()) {
140143
// in this case, CallOperator must have only one child ColumnRefOperator
141-
projectMap.put(entry.getKey(), entry.getValue().getChild(0));
144+
//
145+
// in AggTable,may metric definition is `col int sum`, but in query sum(col)'s type is bigint,
146+
// so after replace sum(col) wit col, we must guarantee the same ColumnRefOperator points to
147+
// the same type expr, so we must cast col as bigint, otherwise during executing, ExchangeSink would
148+
// send serialize the column as int column while the ExchangeSource would derserialize it in bigint
149+
// column.
150+
ColumnRefOperator columnRef = entry.getKey();
151+
CallOperator aggFunc = entry.getValue();
152+
ScalarOperator aggFuncArg = aggFunc.getChild(0);
153+
if (aggFunc.getType().equals(aggFuncArg.getType())) {
154+
projectMap.put(columnRef, aggFuncArg);
155+
} else {
156+
ScalarOperator newExpr = new ScalarOperatorRewriter().rewrite(
157+
new CastOperator(aggFunc.getType(), aggFuncArg, true),
158+
Lists.newArrayList(new ReduceCastRule()));
159+
projectMap.put(columnRef, newExpr);
160+
}
142161
}
143162

144163
Map<ColumnRefOperator, ScalarOperator> newProjectMap = Maps.newHashMap();

fe/fe-core/src/test/java/com/starrocks/planner/PreAggregationTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ public static void beforeClass() throws Exception {
5656
"PROPERTIES (\n" +
5757
" \"replication_num\" = \"1\"\n" +
5858
");");
59+
60+
starRocksAssert.withTable("CREATE TABLE IF NOT EXISTS `test_agg_3` (\n" +
61+
" `k1` int(11) NULL,\n" +
62+
" `v1` int SUM NULL\n" +
63+
") ENGINE=OLAP\n" +
64+
"AGGREGATE KEY(`k1`)\n" +
65+
"DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n" +
66+
"PROPERTIES (\n" +
67+
" \"replication_num\" = \"1\"\n" +
68+
");");
5969
}
6070

6171
public String getFragmentPlan(String sql) throws Exception {
@@ -112,4 +122,24 @@ public void testPreAggregationCaseWhen() throws Exception {
112122
" TABLE: test_agg_2\n" +
113123
" PREAGGREGATION: ON\n");
114124
}
125+
126+
@Test
127+
public void testMetricTypeOfAggTableNotMatchAggragationReturnType() throws Exception {
128+
String sql = "select \n" +
129+
"col1, \n" +
130+
"col2 \n" +
131+
"from (\n" +
132+
"select k1 col1, \n" +
133+
"IFNULL(SUM(v1),0) col2 \n" +
134+
"from test_agg_3 \n" +
135+
"group by k1\n" +
136+
")tmp \n" +
137+
"where 1=1 and col2 > 1 \n" +
138+
"order by col1 \n" +
139+
"asc limit 0,5";
140+
String plan = getFragmentPlan(sql);
141+
assertContains(plan, " 1:Project\n" +
142+
" | <slot 1> : 1: k1\n" +
143+
" | <slot 4> : ifnull(CAST(2: v1 AS BIGINT), 0)");
144+
}
115145
}

0 commit comments

Comments
 (0)