Skip to content

Commit 25496f8

Browse files
authored
Connect to frontend directly, bypass coordinator for GIE query (#2923)
1 parent 96d5093 commit 25496f8

File tree

20 files changed

+416
-523
lines changed

20 files changed

+416
-523
lines changed

.github/workflows/local-ci.yml

+4-6
Original file line numberDiff line numberDiff line change
@@ -298,18 +298,16 @@ jobs:
298298
# install java
299299
sudo apt update -y && sudo apt install openjdk-11-jdk -y
300300
301+
- name: Setup tmate session
302+
uses: mxschmitt/action-tmate@v3
303+
if: false
304+
301305
- name: Run Minimum Test
302306
env:
303307
GS_TEST_DIR: ${{ github.workspace }}/gstest
304308
run: |
305-
git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR}
306-
307309
python3 -m pytest -s -v $(dirname $(python3 -c "import graphscope; print(graphscope.__file__)"))/tests/minitest
308310
309-
- name: Setup tmate session
310-
uses: mxschmitt/action-tmate@v3
311-
if: false
312-
313311
- name: Upload GIE log
314312
if: failure()
315313
uses: actions/upload-artifact@v3

coordinator/gscoordinator/coordinator.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
from gscoordinator.dag_manager import GSEngine
6060
from gscoordinator.kubernetes_launcher import KubernetesClusterLauncher
6161
from gscoordinator.monitor import Monitor
62-
from gscoordinator.object_manager import InteractiveQueryManager
62+
from gscoordinator.object_manager import InteractiveInstanceManager
6363
from gscoordinator.object_manager import LearningInstanceManager
6464
from gscoordinator.object_manager import ObjectManager
6565
from gscoordinator.op_executor import OperationExecutor
@@ -454,7 +454,7 @@ def _match_frontend_endpoint(pattern, lines):
454454
proc = self._launcher.create_interactive_instance(
455455
object_id, schema_path, params
456456
)
457-
gie_manager = InteractiveQueryManager(object_id)
457+
gie_manager = InteractiveInstanceManager(object_id)
458458
# Put it to object_manager to ensure it could be killed during coordinator cleanup
459459
# If coordinator is shutdown by force when creating interactive instance
460460
self._object_manager.put(object_id, gie_manager)

coordinator/gscoordinator/dag_manager.py

-2
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ class DAGManager(object):
6666

6767
_interactive_engine_split_op = [
6868
types_pb2.SUBGRAPH,
69-
types_pb2.GREMLIN_QUERY,
70-
types_pb2.FETCH_GREMLIN_RESULT,
7169
]
7270

7371
_learning_engine_split_op = []

coordinator/gscoordinator/monitor.py

-2
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@
5555
22: "INDUCE_SUBGRAPH",
5656
23: "UNLOAD_CONTEXT",
5757
32: "SUBGRAPH",
58-
33: "GREMLIN_QUERY",
59-
34: "FETCH_GREMLIN_RESULT",
6058
46: "DATA_SOURCE",
6159
47: "DATA_SINK",
6260
50: "CONTEXT_TO_NUMPY",

coordinator/gscoordinator/object_manager.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ def __init__(self, key, object_id, graph_def, schema_path=None):
3535
self.schema_path = schema_path
3636

3737

38-
class InteractiveQueryManager(object):
39-
def __init__(self, object_id, endpoint=None):
38+
class InteractiveInstanceManager(object):
39+
def __init__(self, object_id):
4040
self.type = "gie_manager"
41-
# graph object id in vineyard
4241
self.object_id = object_id
43-
self.endpoint = endpoint
42+
self.endpoint = None
4443
self.client = None
4544

4645
def set_endpoint(self, endpoint):
@@ -51,7 +50,6 @@ def __del__(self):
5150
try:
5251
self.client.close()
5352
except Exception:
54-
# TODO(siyuan): throws no event loop exception with tornado 5.1.1
5553
pass
5654

5755
def submit(self, message, bindings=None, request_options=None):
@@ -62,13 +60,6 @@ def submit(self, message, bindings=None, request_options=None):
6260
return self.client.submit(message, bindings, request_options)
6361

6462

65-
class GremlinResultSet(object):
66-
def __init__(self, key, result_set):
67-
self.key = key
68-
self.type = "result_set"
69-
self.result_set = result_set
70-
71-
7263
class LearningInstanceManager(object):
7364
def __init__(self, object_id):
7465
self.type = "gle_manager"

coordinator/gscoordinator/op_executor.py

+1-39
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
from gscoordinator.monitor import Monitor
3232
from gscoordinator.object_manager import GraphMeta
33-
from gscoordinator.object_manager import GremlinResultSet
3433
from gscoordinator.object_manager import LibMeta
3534
from gscoordinator.utils import ANALYTICAL_BUILTIN_SPACE
3635
from gscoordinator.utils import ANALYTICAL_ENGINE_JAVA_INIT_CLASS_PATH
@@ -441,11 +440,7 @@ def run_on_interactive_engine(self, dag_def: op_def_pb2.DagDef):
441440
for op in dag_def.op:
442441
self._key_to_op[op.key] = op
443442
op_pre_process(op, self._op_result_pool, self._key_to_op)
444-
if op.op == types_pb2.GREMLIN_QUERY:
445-
op_result = self._execute_gremlin_query(op)
446-
elif op.op == types_pb2.FETCH_GREMLIN_RESULT:
447-
op_result = self._fetch_gremlin_result(op)
448-
elif op.op == types_pb2.SUBGRAPH:
443+
if op.op == types_pb2.SUBGRAPH:
449444
op_result = self._gremlin_to_subgraph(op)
450445
else:
451446
raise RuntimeError("Unsupported op type: " + str(op.op))
@@ -454,39 +449,6 @@ def run_on_interactive_engine(self, dag_def: op_def_pb2.DagDef):
454449
self._op_result_pool[op.key] = op_result
455450
return message_pb2.RunStepResponse(head=response_head), []
456451

457-
def _execute_gremlin_query(self, op: op_def_pb2.OpDef):
458-
logger.debug("execute gremlin query")
459-
message = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
460-
request_options = None
461-
if types_pb2.GIE_GREMLIN_REQUEST_OPTIONS in op.attr:
462-
request_options = json.loads(
463-
op.attr[types_pb2.GIE_GREMLIN_REQUEST_OPTIONS].s.decode()
464-
)
465-
object_id = op.attr[types_pb2.VINEYARD_ID].i
466-
gremlin_client = self._object_manager.get(object_id)
467-
rlt = gremlin_client.submit(message, request_options=request_options)
468-
logger.debug("put %s, client %s", op.key, gremlin_client)
469-
self._object_manager.put(op.key, GremlinResultSet(op.key, rlt))
470-
return op_def_pb2.OpResult(code=OK, key=op.key)
471-
472-
def _fetch_gremlin_result(self, op: op_def_pb2.OpDef):
473-
fetch_result_type = op.attr[types_pb2.GIE_GREMLIN_FETCH_RESULT_TYPE].s.decode()
474-
key_of_parent_op = op.parents[0]
475-
result_set = self._object_manager.get(key_of_parent_op).result_set
476-
if fetch_result_type == "one":
477-
rlt = result_set.one()
478-
elif fetch_result_type == "all":
479-
rlt = result_set.all().result()
480-
else:
481-
raise RuntimeError("Not supported fetch result type: " + fetch_result_type)
482-
# Large data should be fetched use gremlin pagination
483-
# meta = op_def_pb2.OpResult.Meta(has_large_result=True)
484-
return op_def_pb2.OpResult(
485-
code=OK,
486-
key=op.key,
487-
result=pickle.dumps(rlt),
488-
)
489-
490452
def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef):
491453
gremlin_script = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
492454
oid_type = op.attr[types_pb2.OID_TYPE].s.decode()

docs/interactive_engine/getting_started.md

+11-4
Original file line numberDiff line numberDiff line change
@@ -69,26 +69,33 @@ graph = load_modern_graph()
6969
g = gs.gremlin(graph)
7070
# then `execute` any supported gremlin query.
7171
q1 = g.execute('g.V().count()')
72-
print(q1.all()) # should print [6]
72+
print(q1.all().result()) # should print [6]
7373

7474
q2 = g.execute('g.V().hasLabel(\'person\')')
75-
print(q2.all()) # should print [[v[2], v[3], v[0], v[1]]]
75+
print(q2.all().result()) # should print [[v[2], v[3], v[0], v[1]]]
7676
```
7777

7878
You may see something like:
7979
```Shell
8080
...
8181
... [INFO][coordinator:453]: Built interactive frontend xxx.xxx.xxx.xxx:pppp for graph xxx
82-
... [INFO][op_executor:455]: execute gremlin query
8382
[6]
8483
...
85-
... [INFO][op_executor:455]: execute gremlin query
8684
[v[2], v[3], v[0], v[1]]
8785
...
8886
```
8987
9088
The number 6 is printed, which is the number of vertices in modern graph.
9189
90+
### Retrieve the gremlin client
91+
92+
The `g` returned by `gs.gremlin()` is a wrapper around `Client` of `gremlinpython`, you could get the `Client` by
93+
94+
```python
95+
client = g.gremlin_client
96+
print(client.submit('g.V()').all().result())
97+
```
98+
9299
### Customize Configurations for GIE instance
93100
94101
You could pass additional key-value pairs to customize the startup configuration of GIE, for example:

0 commit comments

Comments
 (0)