Skip to content

Commit cd9ad2a

Browse files
authored
Merge pull request #10 from iromzy/feature/streaming
Close #8 Add Bidirectional stream with GRPC
2 parents 083f51d + e83e0f2 commit cd9ad2a

File tree

6 files changed

+703
-230
lines changed

6 files changed

+703
-230
lines changed

cmd/a2l/a2l.go

+206-65
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,34 @@ package main
33
import (
44
"C"
55
"bytes"
6-
"context"
76
"encoding/json"
87
"fmt"
98
"github.com/antlr4-go/antlr/v4"
109
"github.com/sauci/a2l-grpc/pkg/a2l"
1110
"github.com/sauci/a2l-grpc/pkg/a2l/parser"
1211
"google.golang.org/grpc"
1312
"google.golang.org/protobuf/encoding/protojson"
13+
"google.golang.org/protobuf/proto"
14+
"io"
1415
"net"
1516
"strings"
1617
"sync"
1718
)
1819

20+
var protocolSizeMargin = 256
21+
22+
func chunkifyBySize(data []byte, chunkSize int) [][]byte {
23+
var chunks [][]byte
24+
for start := 0; start < len(data); start += chunkSize {
25+
end := start + chunkSize
26+
if end > len(data) {
27+
end = len(data)
28+
}
29+
chunks = append(chunks, data[start:end])
30+
}
31+
return chunks
32+
}
33+
1934
type A2LSyntaxError struct {
2035
line, column int
2136
msg string
@@ -68,106 +83,232 @@ func getTreeFromString(a2lString string) (result *a2l.RootNodeType, err error) {
6883

6984
type grpcA2LImplType struct {
7085
a2l.UnimplementedA2LServer
86+
chunkSize int
7187
}
7288

73-
func (s *grpcA2LImplType) GetTreeFromA2L(_ context.Context, request *a2l.TreeFromA2LRequest) (result *a2l.TreeResponse, err error) {
74-
var tree *a2l.RootNodeType
89+
func (s *grpcA2LImplType) GetTreeFromA2L(stream a2l.A2L_GetTreeFromA2LServer) error {
90+
var buffer bytes.Buffer
7591
var parseError error
92+
var err error
93+
var serializedTree []byte
94+
var chunk []byte
95+
var request *a2l.TreeFromA2LRequest
96+
tree := &a2l.RootNodeType{}
97+
response := &a2l.TreeResponse{}
7698

77-
result = &a2l.TreeResponse{}
99+
for {
100+
request, err = stream.Recv()
101+
if err != nil {
102+
if err == io.EOF {
103+
err = nil
104+
}
105+
break
106+
}
107+
buffer.Write(request.A2L)
108+
}
78109

79-
if tree, parseError = getTreeFromString(string(request.A2L)); parseError == nil {
80-
result.Tree = tree
81-
} else {
82-
errString := parseError.Error()
83-
result.Error = &errString
110+
if err == nil {
111+
if tree, parseError = getTreeFromString(buffer.String()); parseError == nil {
112+
if serializedTree, err = proto.Marshal(tree); err == nil {
113+
for _, chunk = range chunkifyBySize(serializedTree, s.chunkSize) {
114+
response.SerializedTreeChunk = chunk
115+
if err = stream.Send(response); err != nil {
116+
break
117+
}
118+
}
119+
} else {
120+
response.Error = proto.String(fmt.Sprintf("An error occured during serialization of Tree: %v", err))
121+
err = stream.Send(response)
122+
}
123+
} else {
124+
errString := parseError.Error()
125+
response.Error = &errString
126+
err = stream.Send(response)
127+
}
84128
}
85129

86-
return result, err
130+
return err
87131
}
88132

89-
func (s *grpcA2LImplType) GetJSONFromTree(_ context.Context, request *a2l.JSONFromTreeRequest) (result *a2l.JSONResponse, err error) {
133+
func (s *grpcA2LImplType) GetJSONFromTree(stream a2l.A2L_GetJSONFromTreeServer) (err error) {
90134
var rawData []byte
91-
var indentedData []byte
135+
var buffer bytes.Buffer
136+
var chunk []byte
92137
var parseError error
138+
var request *a2l.JSONFromTreeRequest
139+
tree := &a2l.RootNodeType{}
140+
response := &a2l.JSONResponse{}
93141
indent := ""
94142
allowPartial := false
95143
emitUnpopulated := false
144+
// Note: optionsParsed := false avoid to parse option for each chunk
145+
optionsParsed := false
96146

97-
result = &a2l.JSONResponse{}
98-
99-
if request.Indent != nil {
100-
for i := uint32(0); i < *request.Indent; i++ {
101-
indent += " "
147+
for {
148+
request, err = stream.Recv()
149+
if err != nil {
150+
if err == io.EOF {
151+
err = nil
152+
}
153+
break
102154
}
103-
}
104-
105-
if request.AllowPartial != nil {
106-
allowPartial = *request.AllowPartial
107-
}
108155

109-
if request.EmitUnpopulated != nil {
110-
emitUnpopulated = *request.EmitUnpopulated
156+
if !optionsParsed {
157+
if request.Indent != nil {
158+
for i := uint32(0); i < *request.Indent; i++ {
159+
indent += " "
160+
}
161+
}
162+
if request.AllowPartial != nil {
163+
allowPartial = *request.AllowPartial
164+
}
165+
if request.EmitUnpopulated != nil {
166+
emitUnpopulated = *request.EmitUnpopulated
167+
}
168+
optionsParsed = true
169+
}
170+
// Use a buffer to receive all chunks
171+
buffer.Write(request.Tree)
111172
}
112-
113-
opt := protojson.MarshalOptions{
114-
AllowPartial: allowPartial,
115-
EmitUnpopulated: emitUnpopulated}
116-
117-
if rawData, parseError = opt.Marshal(request.Tree); parseError == nil {
118-
// Note: see https://github.com/golang/protobuf/issues/1121
119-
buffer := bytes.NewBuffer(indentedData)
120-
if err = json.Indent(buffer, rawData, "", indent); err == nil {
121-
result.Json = buffer.Bytes()
173+
if err == nil {
174+
if parseError = proto.Unmarshal(buffer.Bytes(), tree); parseError == nil {
175+
opt := protojson.MarshalOptions{
176+
AllowPartial: allowPartial,
177+
EmitUnpopulated: emitUnpopulated}
178+
179+
if rawData, parseError = opt.Marshal(tree); parseError == nil {
180+
// Note: see https://github.com/golang/protobuf/issues/1121
181+
var indentedBuffer bytes.Buffer
182+
if err = json.Indent(&indentedBuffer, rawData, "", indent); err == nil {
183+
rawData = indentedBuffer.Bytes()
184+
for _, chunk = range chunkifyBySize(rawData, s.chunkSize) {
185+
response.Json = chunk
186+
if err = stream.Send(response); err != nil {
187+
break
188+
}
189+
}
190+
} else {
191+
response.Error = proto.String(fmt.Sprintf("An error occured during json indent: %v", err))
192+
err = stream.Send(response)
193+
}
194+
} else {
195+
errString := parseError.Error()
196+
response.Error = &errString
197+
err = stream.Send(response)
198+
}
122199
} else {
123-
errString := err.Error()
124-
result.Error = &errString
200+
errString := parseError.Error()
201+
response.Error = &errString
202+
err = stream.Send(response)
125203
}
126-
} else {
127-
errString := parseError.Error()
128-
result.Error = &errString
129204
}
130205

131-
return result, err
206+
return err
132207
}
133208

134-
func (s *grpcA2LImplType) GetTreeFromJSON(_ context.Context, request *a2l.TreeFromJSONRequest) (result *a2l.TreeResponse, err error) {
209+
func (s *grpcA2LImplType) GetTreeFromJSON(stream a2l.A2L_GetTreeFromJSONServer) (err error) {
135210
var parseError error
211+
var buffer bytes.Buffer
212+
var serializedTree []byte
213+
var chunk []byte
214+
var request *a2l.TreeFromJSONRequest
215+
tree := &a2l.RootNodeType{}
216+
response := &a2l.TreeResponse{}
136217
allowPartial := false
218+
// Note: optionsParsed := false avoid to parse option for each chunk
219+
optionsParsed := false
137220

138-
result = &a2l.TreeResponse{Tree: &a2l.RootNodeType{}}
139-
140-
if request.AllowPartial != nil {
141-
allowPartial = *request.AllowPartial
142-
}
143-
144-
opt := protojson.UnmarshalOptions{
145-
AllowPartial: allowPartial,
221+
for {
222+
request, err = stream.Recv()
223+
if err != nil {
224+
if err == io.EOF {
225+
err = nil
226+
}
227+
break
228+
}
229+
if !optionsParsed {
230+
if request.AllowPartial != nil {
231+
allowPartial = *request.AllowPartial
232+
}
233+
optionsParsed = true
234+
}
235+
// Use a buffer to receive all chunks
236+
buffer.Write(request.Json)
146237
}
238+
if err == nil {
239+
opt := protojson.UnmarshalOptions{
240+
AllowPartial: allowPartial,
241+
}
147242

148-
if parseError = opt.Unmarshal(request.Json, result.Tree); parseError != nil {
149-
errString := parseError.Error()
150-
result.Error = &errString
243+
if parseError = opt.Unmarshal(buffer.Bytes(), tree); parseError == nil {
244+
if serializedTree, err = proto.Marshal(tree); err == nil {
245+
for _, chunk = range chunkifyBySize(serializedTree, s.chunkSize) {
246+
response.SerializedTreeChunk = chunk
247+
if err = stream.Send(response); err != nil {
248+
break
249+
}
250+
}
251+
} else {
252+
response.Error = proto.String(fmt.Sprintf("An error occured during serialization of Tree: %v", err))
253+
err = stream.Send(response)
254+
}
255+
} else {
256+
errString := parseError.Error()
257+
response.Error = &errString
258+
err = stream.Send(response)
259+
}
151260
}
152261

153-
return result, err
262+
return err
154263
}
155264

156-
func (s *grpcA2LImplType) GetA2LFromTree(_ context.Context, request *a2l.A2LFromTreeRequest) (result *a2l.A2LResponse, err error) {
265+
func (s *grpcA2LImplType) GetA2LFromTree(stream a2l.A2L_GetA2LFromTreeServer) (err error) {
266+
var buffer bytes.Buffer
267+
var request *a2l.A2LFromTreeRequest
268+
var chunk []byte
269+
var a2lDataBytes []byte
270+
tree := &a2l.RootNodeType{}
271+
response := &a2l.A2LResponse{}
157272
indent := ""
158273
sorted := false
274+
// Note: optionsParsed := false avoid to parse option for each chunk
275+
optionsParsed := false
159276

160-
if request.Indent != nil {
161-
for i := uint32(0); i < *request.Indent; i++ {
162-
indent += " "
277+
for {
278+
request, err = stream.Recv()
279+
if err != nil {
280+
if err == io.EOF {
281+
err = nil
282+
}
283+
break
284+
}
285+
if !optionsParsed {
286+
if request.Indent != nil {
287+
for i := uint32(0); i < *request.Indent; i++ {
288+
indent += " "
289+
}
290+
}
291+
if request.Sorted != nil {
292+
sorted = *request.Sorted
293+
}
294+
optionsParsed = true
163295
}
296+
// Use a buffer to receive all chunks
297+
buffer.Write(request.Tree)
164298
}
165-
166-
if request.Sorted != nil {
167-
sorted = *request.Sorted
299+
if err == nil {
300+
if err = proto.Unmarshal(buffer.Bytes(), tree); err == nil {
301+
a2lDataBytes = []byte(tree.MarshalA2L(0, indent, sorted))
302+
for _, chunk = range chunkifyBySize(a2lDataBytes, s.chunkSize) {
303+
response.A2L = chunk
304+
if err = stream.Send(response); err != nil {
305+
break
306+
}
307+
}
308+
}
168309
}
169310

170-
return &a2l.A2LResponse{A2L: []byte(request.Tree.MarshalA2L(0, indent, sorted))}, nil
311+
return err
171312
}
172313

173314
//export GetJSONByteArrayFromA2LByteArray
@@ -180,7 +321,7 @@ var serverMutex sync.Mutex
180321
var server *grpc.Server
181322

182323
//export Create
183-
func Create(port C.int) (result C.int) {
324+
func Create(port C.int, maxMsgSize C.int) (result C.int) {
184325
var err error
185326
var listener net.Listener
186327

@@ -195,9 +336,9 @@ func Create(port C.int) (result C.int) {
195336

196337
if result == 0 {
197338
if listener, err = net.Listen("tcp", fmt.Sprintf(":%v", port)); err == nil {
198-
server = grpc.NewServer(grpc.MaxRecvMsgSize(200*1024*1024), grpc.MaxSendMsgSize(200*1024*1024))
339+
server = grpc.NewServer(grpc.MaxRecvMsgSize(int(maxMsgSize)), grpc.MaxSendMsgSize(int(maxMsgSize)))
199340

200-
a2l.RegisterA2LServer(server, &grpcA2LImplType{})
341+
a2l.RegisterA2LServer(server, &grpcA2LImplType{chunkSize: int(maxMsgSize) - protocolSizeMargin})
201342

202343
go func() {
203344
err = server.Serve(listener)
@@ -227,7 +368,7 @@ func Close() (result C.int) {
227368
}
228369

229370
func main() {
230-
Create(3333)
371+
Create(3333, 4*1024*1024)
231372

232373
for {
233374
select {}

0 commit comments

Comments
 (0)