Skip to content

Commit cd8e9a9

Browse files
authored
[ISSUE-408] Support Incremental match operator (#470)
1 parent 66ac534 commit cd8e9a9

File tree

21 files changed

+1125
-55
lines changed

21 files changed

+1125
-55
lines changed

geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/DSLConfigKeys.java

+22
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,28 @@
2020

2121
public class DSLConfigKeys implements Serializable {
2222

23+
private static final long serialVersionUID = 3550044668482560581L;
24+
25+
public static final ConfigKey INCR_TRAVERSAL_ITERATION_THRESHOLD = ConfigKeys
26+
.key("geaflow.dsl.incr.traversal.iteration.threshold")
27+
.defaultValue(4)
28+
.description("The max iteration to enable incr match");
29+
30+
public static final ConfigKey TABLE_SINK_SPLIT_LINE = ConfigKeys
31+
.key("geaflow.dsl.table.sink.split.line")
32+
.noDefaultValue()
33+
.description("The file sink split line.");
34+
35+
public static final ConfigKey ENABLE_INCR_TRAVERSAL = ConfigKeys
36+
.key("geaflow.dsl.graph.enable.incr.traversal")
37+
.defaultValue(false)
38+
.description("Enable incr match");
39+
40+
public static final ConfigKey INCR_TRAVERSAL_WINDOW = ConfigKeys
41+
.key("geaflow.dsl.graph.incr.traversal.window")
42+
.defaultValue(-1L)
43+
.description("When window id is large than this parameter to enable incr match");
44+
2345
public static final ConfigKey GEAFLOW_DSL_STORE_TYPE = ConfigKeys
2446
.key("geaflow.dsl.graph.store.type")
2547
.noDefaultValue()

geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/com/antgroup/geaflow/operator/impl/graph/traversal/dynamic/AbstractDynamicGraphVertexCentricTraversalOp.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,13 @@ public void close() {
176176
this.responses.clear();
177177
}
178178

179-
class IncGraphVCTraversalCtxImpl extends IncGraphContextImpl<K, VV, EV, M>
179+
public class IncGraphVCTraversalCtxImpl extends IncGraphContextImpl<K, VV, EV, M>
180180
implements IncVertexCentricTraversalFuncContext<K, VV, EV, M, R> {
181181

182182
private final ICollector<IGraphMessage<K, M>> messageCollector;
183183
private final String opName;
184184
private final TraversalHistoricalGraph<K, VV, EV> traversalHistoricalGraph;
185+
private boolean enableIncrMatch;
185186

186187
protected IncGraphVCTraversalCtxImpl(String opName,
187188
ICollector<IGraphMessage<K, M>> messageCollector) {
@@ -192,6 +193,15 @@ protected IncGraphVCTraversalCtxImpl(String opName,
192193
(IncHistoricalGraph<K, VV, EV>) super.getHistoricalGraph());
193194
}
194195

196+
public boolean isEnableIncrMatch() {
197+
return enableIncrMatch;
198+
}
199+
200+
public IncGraphVCTraversalCtxImpl setEnableIncrMatch(boolean enableIncrMatch) {
201+
this.enableIncrMatch = enableIncrMatch;
202+
return this;
203+
}
204+
195205
@Override
196206
public void activeRequest(ITraversalRequest<K> request) {
197207

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2023 AntGroup CO., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
*/
14+
15+
package com.antgroup.geaflow.operator.impl.graph.traversal.dynamic;
16+
17+
18+
import com.antgroup.geaflow.api.context.RuntimeContext;
19+
import com.antgroup.geaflow.common.config.Configuration;
20+
import com.antgroup.geaflow.common.config.keys.DSLConfigKeys;
21+
22+
public class DynamicGraphHelper {
23+
24+
public static boolean enableIncrTraversal(int maxIterationCount, int startIdSize, Configuration configuration) {
25+
if (configuration != null) {
26+
boolean res = configuration.getBoolean(DSLConfigKeys.ENABLE_INCR_TRAVERSAL);
27+
if (!res) {
28+
return false;
29+
}
30+
}
31+
32+
int traversalThreshold = configuration.getInteger(DSLConfigKeys.INCR_TRAVERSAL_ITERATION_THRESHOLD);
33+
// when maxIterationCount <=2 no need to include subGraph, since 1 hop is already included in the incr edges.
34+
return maxIterationCount > 2 && maxIterationCount <= traversalThreshold && startIdSize == 0;
35+
}
36+
37+
public static boolean enableIncrTraversalRuntime(RuntimeContext runtimeContext) {
38+
long windowId = runtimeContext.getWindowId();
39+
if (windowId == 1) {
40+
// the first window not need evolve
41+
return false;
42+
}
43+
long window = runtimeContext.getConfiguration().getLong(DSLConfigKeys.INCR_TRAVERSAL_WINDOW);
44+
if (window == -1) {
45+
// default do incr
46+
return true;
47+
} else {
48+
return windowId > window;
49+
}
50+
}
51+
}

geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/com/antgroup/geaflow/operator/impl/graph/traversal/dynamic/DynamicGraphVertexCentricTraversalAllOp.java

+28-8
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,36 @@ public DynamicGraphVertexCentricTraversalAllOp(
3333
super(graphViewDesc, vcTraversal);
3434
}
3535

36+
private void traversalEvolveVIds() {
37+
for (K vertexId : temporaryGraphCache.getAllEvolveVId()) {
38+
ITraversalRequest<K> traversalRequest = new VertexBeginTraversalRequest<>(vertexId);
39+
this.graphVCTraversalCtx.init(iterations, vertexId);
40+
this.incVcTraversalFunction.init(traversalRequest);
41+
}
42+
}
43+
3644
@Override
3745
protected void traversalByRequest() {
38-
if (!temporaryGraphCache.getAllEvolveVId().isEmpty()) {
39-
try (CloseableIterator<K> idIterator =
40-
graphState.dynamicGraph().V().query(GRAPH_VERSION, keyGroup).idIterator()) {
41-
while (idIterator.hasNext()) {
42-
K vertexId = idIterator.next();
43-
ITraversalRequest<K> traversalRequest = new VertexBeginTraversalRequest<>(vertexId);
44-
this.graphVCTraversalCtx.init(iterations, vertexId);
45-
this.incVcTraversalFunction.init(traversalRequest);
46+
if (graphVCTraversalCtx.isEnableIncrMatch() && DynamicGraphHelper.enableIncrTraversalRuntime(runtimeContext)) {
47+
traversalEvolveVIds();
48+
49+
} else {
50+
if (function.getMaxIterationCount() <= 2) {
51+
// The evolved vertices/edges can cover the match pattern when iteration <= 2 (e.g match(a)->(b)).
52+
traversalEvolveVIds();
53+
return;
54+
}
55+
56+
// Traversal all vertices.
57+
if (!temporaryGraphCache.getAllEvolveVId().isEmpty()) {
58+
try (CloseableIterator<K> idIterator =
59+
graphState.dynamicGraph().V().query(GRAPH_VERSION, keyGroup).idIterator()) {
60+
while (idIterator.hasNext()) {
61+
K vertexId = idIterator.next();
62+
ITraversalRequest<K> traversalRequest = new VertexBeginTraversalRequest<>(vertexId);
63+
this.graphVCTraversalCtx.init(iterations, vertexId);
64+
this.incVcTraversalFunction.init(traversalRequest);
65+
}
4666
}
4767
}
4868
}

geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-file/src/main/java/com/antgroup/geaflow/dsl/connector/file/sink/FileTableSink.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.antgroup.geaflow.api.context.RuntimeContext;
1818
import com.antgroup.geaflow.common.config.Configuration;
1919
import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
20+
import com.antgroup.geaflow.common.config.keys.DSLConfigKeys;
2021
import com.antgroup.geaflow.dsl.common.data.Row;
2122
import com.antgroup.geaflow.dsl.common.types.StructType;
2223
import com.antgroup.geaflow.dsl.connector.api.TableSink;
@@ -72,6 +73,11 @@ public void write(Row row) throws IOException {
7273

7374
@Override
7475
public void finish() throws IOException {
76+
String split = tableConf.getString(DSLConfigKeys.TABLE_SINK_SPLIT_LINE.getKey(), null);
77+
if (split != null) {
78+
writer.write(split + "\n");
79+
}
80+
7581
writer.flush();
7682
}
7783

geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowCommonTraversalFunction.java

+12
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,16 @@ public void finish(long iterationId) {
123123
public void close() {
124124
executeDagGroup.close();
125125
}
126+
127+
public ExecuteDagGroup getExecuteDagGroup() {
128+
return executeDagGroup;
129+
}
130+
131+
public TraversalRuntimeContext getContext() {
132+
return context;
133+
}
134+
135+
public List<ITraversalRequest<Object>> getInitRequests() {
136+
return initRequests;
137+
}
126138
}

geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowDynamicVCTraversal.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@ public class GeaFlowDynamicVCTraversal extends IncVertexCentricTraversal<Object,
2828

2929
private final boolean isTraversalAllWithRequest;
3030

31+
private final boolean enableIncrTraversal;
32+
3133
public GeaFlowDynamicVCTraversal(ExecuteDagGroup executeDagGroup,
3234
int maxTraversal,
33-
boolean isTraversalAllWithRequest) {
35+
boolean isTraversalAllWithRequest,
36+
boolean enableIncrTraversal) {
3437
super(maxTraversal);
3538
this.executeDagGroup = executeDagGroup;
3639
this.isTraversalAllWithRequest = isTraversalAllWithRequest;
40+
this.enableIncrTraversal = enableIncrTraversal;
3741
}
3842

3943
@Override
@@ -43,6 +47,6 @@ public VertexCentricCombineFunction<MessageBox> getCombineFunction() {
4347

4448
@Override
4549
public IncVertexCentricTraversalFunction<Object, Row, Row, MessageBox, ITreePath> getIncTraversalFunction() {
46-
return new GeaFlowDynamicVCTraversalFunction(executeDagGroup, isTraversalAllWithRequest);
50+
return new GeaFlowDynamicVCTraversalFunction(executeDagGroup, isTraversalAllWithRequest, enableIncrTraversal);
4751
}
4852
}

0 commit comments

Comments
 (0)