Skip to content

Commit 8b7d2fe

Browse files
authored
grpc: fix bug causing an extra Read if a compressed message is the same size as the limit (#8181)
1 parent 9c81a91 commit 8b7d2fe

File tree

3 files changed

+65
-8
lines changed

3 files changed

+65
-8
lines changed

clientconn_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050

5151
const (
5252
defaultTestTimeout = 10 * time.Second
53+
defaultTestShortTimeout = 10 * time.Millisecond
5354
stateRecordingBalancerName = "state_recording_balancer"
5455
)
5556

rpc_util.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -870,13 +870,19 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompress
870870
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the message: %v", err)
871871
}
872872

873-
out, err := mem.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)), pool)
873+
// Read at most one byte more than the limit from the decompressor.
874+
// Unless the limit is MaxInt64, in which case, that's impossible, so
875+
// apply no limit.
876+
if limit := int64(maxReceiveMessageSize); limit < math.MaxInt64 {
877+
dcReader = io.LimitReader(dcReader, limit+1)
878+
}
879+
out, err := mem.ReadAll(dcReader, pool)
874880
if err != nil {
875881
out.Free()
876882
return nil, status.Errorf(codes.Internal, "grpc: failed to read decompressed data: %v", err)
877883
}
878884

879-
if out.Len() == maxReceiveMessageSize && !atEOF(dcReader) {
885+
if out.Len() > maxReceiveMessageSize {
880886
out.Free()
881887
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max %d", maxReceiveMessageSize)
882888
}
@@ -885,12 +891,6 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, dc Decompress
885891
return nil, status.Errorf(codes.Internal, "grpc: no decompressor available for compressed payload")
886892
}
887893

888-
// atEOF reads data from r and returns true if zero bytes could be read and r.Read returns EOF.
889-
func atEOF(dcReader io.Reader) bool {
890-
n, err := dcReader.Read(make([]byte, 1))
891-
return n == 0 && err == io.EOF
892-
}
893-
894894
type recvCompressor interface {
895895
RecvCompress() string
896896
}

rpc_util_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ package grpc
2121
import (
2222
"bytes"
2323
"compress/gzip"
24+
"context"
2425
"errors"
2526
"io"
2627
"math"
2728
"reflect"
29+
"sync"
2830
"testing"
2931

3032
"github.com/google/go-cmp/cmp"
@@ -421,3 +423,57 @@ func (s) TestDecompress(t *testing.T) {
421423
})
422424
}
423425
}
426+
427+
type mockCompressor struct {
428+
// Written to by the io.Reader on every call to Read.
429+
ch chan<- struct{}
430+
}
431+
432+
func (m *mockCompressor) Compress(io.Writer) (io.WriteCloser, error) {
433+
panic("unimplemented")
434+
}
435+
436+
func (m *mockCompressor) Decompress(io.Reader) (io.Reader, error) {
437+
return m, nil
438+
}
439+
440+
func (m *mockCompressor) Read([]byte) (int, error) {
441+
m.ch <- struct{}{}
442+
return 1, io.EOF
443+
}
444+
445+
func (m *mockCompressor) Name() string { return "" }
446+
447+
// Tests that the decompressor's Read method is not called after it returns EOF.
448+
func (s) TestDecompress_NoReadAfterEOF(t *testing.T) {
449+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
450+
defer cancel()
451+
452+
ch := make(chan struct{}, 10)
453+
mc := &mockCompressor{ch: ch}
454+
in := mem.BufferSlice{mem.NewBuffer(&[]byte{1, 2, 3}, nil)}
455+
wg := sync.WaitGroup{}
456+
wg.Add(1)
457+
go func() {
458+
defer wg.Done()
459+
out, err := decompress(mc, in, nil, 1, mem.DefaultBufferPool())
460+
if err != nil {
461+
t.Errorf("Unexpected error from decompress: %v", err)
462+
return
463+
}
464+
out.Free()
465+
}()
466+
select {
467+
case <-ch:
468+
case <-ctx.Done():
469+
t.Fatalf("Timed out waiting for call to compressor")
470+
}
471+
ctx, cancel = context.WithTimeout(ctx, defaultTestShortTimeout)
472+
defer cancel()
473+
select {
474+
case <-ch:
475+
t.Fatalf("Unexpected second compressor.Read call detected")
476+
case <-ctx.Done():
477+
}
478+
wg.Wait()
479+
}

0 commit comments

Comments
 (0)