Skip to content

Commit e5d131a

Browse files
authored
Add batch request limit and response size limit in a RPC batch request (#2357)
* Add batch request limit and response size limit in a RPC batch request * Move appending result after check of total response size in batch request
1 parent 6a9b47d commit e5d131a

File tree

3 files changed

+196
-1
lines changed

3 files changed

+196
-1
lines changed

rpc/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ var (
5454
_ Error = new(invalidRequestError)
5555
_ Error = new(invalidMessageError)
5656
_ Error = new(invalidParamsError)
57+
_ Error = new(responseTooLargeError)
5758
)
5859

5960
const defaultErrorCode = -32000
@@ -101,3 +102,9 @@ type invalidParamsError struct{ message string }
101102
func (e *invalidParamsError) ErrorCode() int { return -32602 }
102103

103104
func (e *invalidParamsError) Error() string { return e.message }
105+
106+
type responseTooLargeError struct{}
107+
108+
func (e *responseTooLargeError) ErrorCode() int { return -32003 }
109+
110+
func (e *responseTooLargeError) Error() string { return "response too large" }

rpc/handler.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ import (
2828
"github.com/celo-org/celo-blockchain/log"
2929
)
3030

31+
const (
32+
BatchRequestLimit = 1000
33+
BatchResponseMaxSize = 25 * 1000 * 1000
34+
)
35+
3136
// handler handles JSON-RPC messages. There is one handler per connection. Note that
3237
// handler is not safe for concurrent use. Message handling never blocks indefinitely
3338
// because RPCs are processed on background goroutines launched by handler.
@@ -100,6 +105,13 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
100105
})
101106
return
102107
}
108+
// Returns an error if number of requests exceeds an allowed maximum
109+
if len(msgs) > BatchRequestLimit {
110+
h.startCallProc(func(cp *callProc) {
111+
h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"batch too large"}))
112+
})
113+
return
114+
}
103115

104116
// Handle non-call messages first:
105117
calls := make([]*jsonrpcMessage, 0, len(msgs))
@@ -112,10 +124,20 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
112124
return
113125
}
114126
// Process calls on a goroutine because they may block indefinitely:
127+
responseBytes := 0
115128
h.startCallProc(func(cp *callProc) {
116129
answers := make([]*jsonrpcMessage, 0, len(msgs))
117-
for _, msg := range calls {
130+
for idx, msg := range calls {
118131
if answer := h.handleCallMsg(cp, msg); answer != nil {
132+
// Once total size of responses exceeds an allowed maximum,
133+
// generate error messages for all remaining calls and stop further processing
134+
if responseBytes += len(answer.Result); responseBytes > BatchResponseMaxSize {
135+
for i := idx; i < len(calls); i++ {
136+
errMsg := msg.errorResponse(&responseTooLargeError{})
137+
answers = append(answers, errMsg)
138+
}
139+
break
140+
}
119141
answers = append(answers, answer)
120142
}
121143
}

rpc/server_test.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ package rpc
1919
import (
2020
"bufio"
2121
"bytes"
22+
"fmt"
2223
"io"
2324
"io/ioutil"
2425
"net"
2526
"path/filepath"
2627
"strings"
2728
"testing"
2829
"time"
30+
31+
"github.com/stretchr/testify/require"
2932
)
3033

3134
func TestServerRegisterName(t *testing.T) {
@@ -150,3 +153,166 @@ func TestServerShortLivedConn(t *testing.T) {
150153
}
151154
}
152155
}
156+
157+
// TestBatchRequestLimit verifies that the server returns "batch too large" error when the number
158+
// of JSON-RPC calls in a batch exceeds the defined limit
159+
func TestBatchRequestLimit(t *testing.T) {
160+
server := newTestServer()
161+
defer server.Stop()
162+
163+
listener, err := net.Listen("tcp", "127.0.0.1:0")
164+
if err != nil {
165+
t.Fatal("can't listen:", err)
166+
}
167+
defer listener.Close()
168+
go server.ServeListener(listener)
169+
170+
var (
171+
request = `{"jsonrpc":"2.0","id":1,"method":"rpc_modules"}`
172+
wantResp = `{"jsonrpc":"2.0","id":null,"error":{"code":-32600,"message":"batch too large"}}` + "\n"
173+
deadline = time.Now().Add(10 * time.Second)
174+
)
175+
176+
// Create a batch request containing (BatchRequestLimit + 1) calls
177+
var reqBuf bytes.Buffer
178+
reqBuf.WriteString("[")
179+
for i := 0; i < BatchRequestLimit+1; i++ {
180+
if i > 0 {
181+
reqBuf.WriteString(",")
182+
}
183+
reqBuf.WriteString(request)
184+
}
185+
reqBuf.WriteString("]\n")
186+
187+
// Write the request to the server and then close the write side of the connection
188+
conn, err := net.Dial("tcp", listener.Addr().String())
189+
if err != nil {
190+
t.Fatal("can't dial:", err)
191+
}
192+
defer conn.Close()
193+
conn.SetDeadline(deadline)
194+
conn.Write(reqBuf.Bytes())
195+
conn.(*net.TCPConn).CloseWrite()
196+
197+
// Verify that the server returns the "batch too large" error
198+
buf := make([]byte, 100)
199+
n, err := conn.Read(buf)
200+
if err != nil {
201+
t.Fatal("read error:", err)
202+
}
203+
if !bytes.Equal(buf[:n], []byte(wantResp)) {
204+
t.Fatalf("wrong response: expected=%s, got=%s", wantResp, buf[:n])
205+
}
206+
207+
// Ensure that the connection is closed and no additional data is returned (EOF expected)
208+
n, err = conn.Read(make([]byte, 1))
209+
require.Zero(t, n)
210+
require.ErrorIs(t, io.EOF, err)
211+
}
212+
213+
// TestBatchResponseMaxSize verifies that the server returns successful responses
214+
// until the total response size exceeds the configured maximum. Once the threshold is exceeded,
215+
// the server should respond with a specific "response too large" error for the remaining requests
216+
// in the batch and then close the connection.
217+
func TestBatchResponseMaxSize(t *testing.T) {
218+
server := newTestServer()
219+
defer server.Stop()
220+
221+
listener, err := net.Listen("tcp", "127.0.0.1:0")
222+
if err != nil {
223+
t.Fatal("can't listen:", err)
224+
}
225+
defer listener.Close()
226+
go server.ServeListener(listener)
227+
228+
var (
229+
strSize = 25 * 1000
230+
strValue = strings.Repeat("A", strSize)
231+
232+
successfulResultData = fmt.Sprintf(`{"String":"%s","Int":1,"Args":null}`, strValue)
233+
successfulResponse = fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"result":%s}`, successfulResultData)
234+
successfulResultDataLen = len(successfulResultData)
235+
successfulResponseLen = len(successfulResponse)
236+
tooLargeErrorResponse = `{"jsonrpc":"2.0","id":1,"error":{"code":-32003,"message":"response too large"}}`
237+
238+
deadline = time.Now().Add(10 * time.Second)
239+
)
240+
241+
// create a batch request
242+
var reqBuf bytes.Buffer
243+
reqBuf.WriteString("[")
244+
for i := 0; i < BatchRequestLimit; i++ {
245+
if i > 0 {
246+
reqBuf.WriteString(",")
247+
}
248+
reqBuf.WriteString(fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"test_echo","params":["%s",1]}`, strValue))
249+
}
250+
reqBuf.WriteString("]\n")
251+
252+
// send request
253+
conn, err := net.Dial("tcp", listener.Addr().String())
254+
if err != nil {
255+
t.Fatal("can't dial:", err)
256+
}
257+
defer conn.Close()
258+
conn.SetDeadline(deadline)
259+
conn.Write(reqBuf.Bytes())
260+
conn.(*net.TCPConn).CloseWrite()
261+
262+
buf := make([]byte, successfulResponseLen)
263+
264+
// mustConsume reads from the connection until the entire provided buffer is filled
265+
var mustConsume func(t *testing.T, buf []byte) int
266+
mustConsume = func(t *testing.T, buf []byte) int {
267+
t.Helper()
268+
n, err := conn.Read(buf)
269+
if err != nil {
270+
t.Fatal("read error:", err)
271+
}
272+
if n < len(buf) {
273+
mustConsume(t, buf[n:])
274+
}
275+
return n
276+
}
277+
// mustConsumeAndCompare consumes from the connection and compares the read data with the given expected data
278+
mustConsumeAndCompare := func(t *testing.T, expected []byte) {
279+
t.Helper()
280+
mustConsume(t, buf[:len(expected)])
281+
if !bytes.Equal(buf[:len(expected)], expected) {
282+
t.Fatalf("wrong response: expected=%s, actual=%s", expected, buf[:len(expected)])
283+
}
284+
}
285+
286+
var (
287+
totalResultSize = 0
288+
errorBeginsAt = 0
289+
)
290+
291+
mustConsumeAndCompare(t, []byte("["))
292+
// Read through each response until the cumulative size limit is exceeded
293+
for i := 0; i < BatchRequestLimit; i++ {
294+
if totalResultSize += successfulResultDataLen; totalResultSize > BatchResponseMaxSize {
295+
// Record the first index where the error should begin
296+
errorBeginsAt = i
297+
break
298+
}
299+
300+
if i > 0 {
301+
mustConsumeAndCompare(t, []byte(","))
302+
}
303+
mustConsumeAndCompare(t, []byte(successfulResponse))
304+
}
305+
306+
// From the point where the total size exceeded the limit,
307+
// check whether all responses for the remaining calls is "response too large"
308+
for i := errorBeginsAt; i < BatchRequestLimit; i++ {
309+
mustConsumeAndCompare(t, []byte(","))
310+
mustConsumeAndCompare(t, []byte(tooLargeErrorResponse))
311+
}
312+
mustConsumeAndCompare(t, []byte("]\n"))
313+
314+
// Ensure that the connection is closed and no additional data is returned (EOF expected)
315+
n, err := conn.Read(make([]byte, 1))
316+
require.Zero(t, n)
317+
require.ErrorIs(t, io.EOF, err)
318+
}

0 commit comments

Comments
 (0)