Skip to content

Commit 7637aaf

Browse files
[GIE] Refine FFI interface build_physical_plan (#2911)
<!-- Thanks for your contribution! please review https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before opening an issue. --> ## What do these changes do? Return physical plan bytes from ffi interface `build_physical_plan` directly, instead of wrapping the physical plan in the JobRequest, which was previously bound to the pegasus engine API. <!-- Please give a short brief about these changes. --> ## Related issue number <!-- Are there any issues opened that will be resolved by merging this change? --> Fixes --------- Co-authored-by: BingqingLyu <[email protected]>
1 parent d0dc2b7 commit 7637aaf

File tree

9 files changed

+359
-314
lines changed

9 files changed

+359
-314
lines changed

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.alibaba.pegasus.RpcClient;
2727
import com.alibaba.pegasus.intf.ResultProcessor;
2828
import com.alibaba.pegasus.service.protocol.PegasusClient;
29+
import com.google.protobuf.ByteString;
2930

3031
import io.grpc.Status;
3132

@@ -49,7 +50,9 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channe
4950
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
5051
throws Exception {
5152
PegasusClient.JobRequest jobRequest =
52-
PegasusClient.JobRequest.parseFrom((byte[]) request.getRequestPhysical().build());
53+
PegasusClient.JobRequest.newBuilder()
54+
.setPlan(ByteString.copyFrom((byte[]) request.getRequestPhysical().build()))
55+
.build();
5356
PegasusClient.JobConfig jobConfig =
5457
PegasusClient.JobConfig.newBuilder()
5558
.setJobId(request.getRequestId())

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.alibaba.pegasus.intf.ResultProcessor;
4949
import com.alibaba.pegasus.service.protocol.PegasusClient;
5050
import com.google.common.collect.ImmutableList;
51+
import com.google.protobuf.ByteString;
5152
import com.google.protobuf.InvalidProtocolBufferException;
5253

5354
import org.antlr.v4.runtime.tree.ParseTree;
@@ -377,7 +378,10 @@ protected void processTraversal(
377378
byte[] physicalPlanBytes = irPlan.toPhysicalBytes(configs);
378379
irPlan.close();
379380

380-
PegasusClient.JobRequest request = PegasusClient.JobRequest.parseFrom(physicalPlanBytes);
381+
PegasusClient.JobRequest request =
382+
PegasusClient.JobRequest.newBuilder()
383+
.setPlan(ByteString.copyFrom(physicalPlanBytes))
384+
.build();
381385
PegasusClient.JobConfig jobConfig =
382386
PegasusClient.JobConfig.newBuilder()
383387
.setJobId(jobId)
@@ -423,7 +427,9 @@ protected void processRelNode(
423427
jobName,
424428
physicalBuilder.explain());
425429
PegasusClient.JobRequest request =
426-
PegasusClient.JobRequest.parseFrom(physicalPlanBytes);
430+
PegasusClient.JobRequest.newBuilder()
431+
.setPlan(ByteString.copyFrom(physicalPlanBytes))
432+
.build();
427433
PegasusClient.JobConfig jobConfig =
428434
PegasusClient.JobConfig.newBuilder()
429435
.setJobId(jobId)

0 commit comments

Comments
 (0)