|
18 | 18 |
|
19 | 19 | import com.alibaba.graphscope.common.config.Configs;
|
20 | 20 | import com.alibaba.graphscope.common.config.PegasusConfig;
|
21 |
| -import com.alibaba.graphscope.common.intermediate.ArgUtils; |
22 | 21 | import com.alibaba.graphscope.common.ir.rel.GraphLogicalAggregate;
|
23 | 22 | import com.alibaba.graphscope.common.ir.rel.GraphLogicalProject;
|
24 | 23 | import com.alibaba.graphscope.common.ir.rel.GraphLogicalSort;
|
|
29 | 28 | import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalSource;
|
30 | 29 | import com.alibaba.graphscope.common.ir.rel.graph.match.GraphLogicalMultiMatch;
|
31 | 30 | import com.alibaba.graphscope.common.ir.rel.graph.match.GraphLogicalSingleMatch;
|
32 |
| -import com.alibaba.graphscope.common.ir.rel.type.group.GraphGroupKeys; |
33 |
| -import com.alibaba.graphscope.common.ir.rex.RexGraphVariable; |
34 | 31 | import com.alibaba.graphscope.common.ir.runtime.RegularPhysicalBuilder;
|
35 |
| -import com.alibaba.graphscope.common.ir.runtime.proto.RexToProtoConverter; |
36 | 32 | import com.alibaba.graphscope.common.ir.runtime.type.PhysicalNode;
|
37 |
| -import com.alibaba.graphscope.common.ir.tools.AliasInference; |
38 | 33 | import com.alibaba.graphscope.common.ir.tools.LogicalPlan;
|
39 |
| -import com.alibaba.graphscope.common.ir.tools.config.GraphOpt; |
40 | 34 | import com.alibaba.graphscope.common.jna.IrCoreLibrary;
|
41 | 35 | import com.alibaba.graphscope.common.jna.type.FfiData;
|
42 |
| -import com.alibaba.graphscope.common.jna.type.FfiPbPointer; |
43 | 36 | import com.alibaba.graphscope.common.jna.type.FfiResult;
|
44 | 37 | import com.alibaba.graphscope.common.jna.type.ResultCode;
|
45 | 38 | import com.alibaba.graphscope.common.store.IrMeta;
|
46 |
| -import com.alibaba.graphscope.gaia.proto.OuterExpression; |
47 | 39 | import com.google.common.base.Preconditions;
|
48 | 40 | import com.sun.jna.Pointer;
|
49 | 41 | import com.sun.jna.ptr.IntByReference;
|
50 | 42 |
|
51 | 43 | import org.apache.calcite.rel.RelNode;
|
52 | 44 | import org.apache.calcite.rel.logical.LogicalFilter;
|
53 |
| -import org.apache.calcite.rel.type.RelDataTypeField; |
54 |
| -import org.apache.calcite.rex.RexNode; |
55 |
| -import org.apache.calcite.rex.RexVariable; |
56 | 45 |
|
57 | 46 | import java.util.List;
|
58 | 47 |
|
@@ -106,7 +95,7 @@ public void appendNode(PhysicalNode<Pointer> node) {
|
106 | 95 | } else if (original instanceof GraphLogicalAggregate) {
|
107 | 96 | // transform aggregate to project + dedup by key
|
108 | 97 | if (((GraphLogicalAggregate) original).getAggCalls().isEmpty()) {
|
109 |
| - appendProjectDedup((GraphLogicalAggregate) original, oprIdx); |
| 98 | + appendProjectDedup(node, oprIdx); |
110 | 99 | } else {
|
111 | 100 | checkFfiResult(
|
112 | 101 | LIB.appendGroupbyOperator(
|
@@ -164,60 +153,25 @@ private void checkFfiResult(FfiResult res) {
|
164 | 153 | }
|
165 | 154 | }
|
166 | 155 |
|
167 |
| - private boolean isColumnId() { |
168 |
| - return this.irMeta.getSchema().isColumnId(); |
169 |
| - } |
170 |
| - |
171 | 156 | private void appendMatch(PhysicalNode<Pointer> node, IntByReference oprIdx) {
|
172 |
| - // append dummy source |
173 |
| - Pointer ptrScan = LIB.initScanOperator(Utils.ffiScanOpt(GraphOpt.Source.VERTEX)); |
174 |
| - checkFfiResult(LIB.appendScanOperator(ptrPlan, ptrScan, oprIdx.getValue(), oprIdx)); |
| 157 | + List<Pointer> ffiNodes = node.getNodes(); |
| 158 | + Preconditions.checkArgument( |
| 159 | + ffiNodes.size() == 2, |
| 160 | + "should have 2 ffi nodes, one is `scan` and the other is `match`"); |
| 161 | + checkFfiResult(LIB.appendScanOperator(ptrPlan, ffiNodes.get(0), oprIdx.getValue(), oprIdx)); |
175 | 162 | checkFfiResult(
|
176 |
| - LIB.appendPatternOperator(ptrPlan, node.getNode(), oprIdx.getValue(), oprIdx)); |
| 163 | + LIB.appendPatternOperator(ptrPlan, ffiNodes.get(1), oprIdx.getValue(), oprIdx)); |
177 | 164 | }
|
178 | 165 |
|
179 |
| - private void appendProjectDedup(GraphLogicalAggregate aggregate, IntByReference oprIdx) { |
180 |
| - GraphGroupKeys keys = aggregate.getGroupKey(); |
| 166 | + private void appendProjectDedup(PhysicalNode<Pointer> node, IntByReference oprIdx) { |
| 167 | + List<Pointer> ffiNodes = node.getNodes(); |
181 | 168 | Preconditions.checkArgument(
|
182 |
| - keys.groupKeyCount() > 0 && aggregate.getAggCalls().isEmpty(), |
183 |
| - "group keys should not be empty while group calls should be empty if need dedup"); |
184 |
| - List<RelDataTypeField> fields = aggregate.getRowType().getFieldList(); |
185 |
| - Pointer ptrProject = LIB.initProjectOperator(false); |
186 |
| - for (int i = 0; i < keys.groupKeyCount(); ++i) { |
187 |
| - RexNode var = keys.getVariables().get(i); |
188 |
| - Preconditions.checkArgument( |
189 |
| - var instanceof RexGraphVariable, |
190 |
| - "each group key should be type %s, but is %s", |
191 |
| - RexGraphVariable.class, |
192 |
| - var.getClass()); |
193 |
| - OuterExpression.Expression expr = |
194 |
| - var.accept(new RexToProtoConverter(true, isColumnId())); |
195 |
| - int aliasId; |
196 |
| - if (i >= fields.size() |
197 |
| - || (aliasId = fields.get(i).getIndex()) == AliasInference.DEFAULT_ID) { |
198 |
| - throw new IllegalArgumentException( |
199 |
| - "each group key should have an alias if need dedup"); |
200 |
| - } |
201 |
| - checkFfiResult( |
202 |
| - LIB.addProjectExprPbAlias( |
203 |
| - ptrProject, |
204 |
| - new FfiPbPointer.ByValue(expr.toByteArray()), |
205 |
| - ArgUtils.asAlias(aliasId))); |
206 |
| - } |
207 |
| - Pointer ptrDedup = LIB.initDedupOperator(); |
208 |
| - for (int i = 0; i < keys.groupKeyCount(); ++i) { |
209 |
| - RelDataTypeField field = fields.get(i); |
210 |
| - RexVariable rexVar = |
211 |
| - RexGraphVariable.of(field.getIndex(), field.getName(), field.getType()); |
212 |
| - OuterExpression.Variable exprVar = |
213 |
| - rexVar.accept(new RexToProtoConverter(true, isColumnId())) |
214 |
| - .getOperators(0) |
215 |
| - .getVar(); |
216 |
| - checkFfiResult( |
217 |
| - LIB.addDedupKeyPb(ptrDedup, new FfiPbPointer.ByValue(exprVar.toByteArray()))); |
218 |
| - } |
219 |
| - checkFfiResult(LIB.appendProjectOperator(ptrPlan, ptrProject, oprIdx.getValue(), oprIdx)); |
220 |
| - checkFfiResult(LIB.appendDedupOperator(ptrPlan, ptrDedup, oprIdx.getValue(), oprIdx)); |
| 169 | + ffiNodes.size() == 2, |
| 170 | + "should have 2 ffi nodes, one is `project` and the other is `dedup`"); |
| 171 | + checkFfiResult( |
| 172 | + LIB.appendProjectOperator(ptrPlan, ffiNodes.get(0), oprIdx.getValue(), oprIdx)); |
| 173 | + checkFfiResult( |
| 174 | + LIB.appendDedupOperator(ptrPlan, ffiNodes.get(1), oprIdx.getValue(), oprIdx)); |
221 | 175 | }
|
222 | 176 |
|
223 | 177 | private void appendSink(IntByReference oprIdx) {
|
|
0 commit comments