Skip to content

Commit bc2f7cc

Browse files
committed
don't limit the number of records in a record batch when decoding
created `getArrayLengthNoLimit` as record batches can contain more than `2*math.MaxUint16 records`. The packet decoder will make sure that the array length isn't greater than the number of bytes remaining to be decoding in the packet. Also added a test for large record counts. Fixes: #3119 Signed-off-by: Ryan Belgrave <[email protected]>
1 parent c7ca87e commit bc2f7cc

File tree

4 files changed

+58
-1
lines changed

4 files changed

+58
-1
lines changed

packet_decoder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type packetDecoder interface {
1515
getUVarint() (uint64, error)
1616
getFloat64() (float64, error)
1717
getArrayLength() (int, error)
18+
getArrayLengthNoLimit() (int, error)
1819
getCompactArrayLength() (int, error)
1920
getBool() (bool, error)
2021
getEmptyTaggedFieldArray() (int, error)

real_decoder.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,20 @@ func (rd *realDecoder) getArrayLength() (int, error) {
121121
return tmp, nil
122122
}
123123

124+
func (rd *realDecoder) getArrayLengthNoLimit() (int, error) {
125+
if rd.remaining() < 4 {
126+
rd.off = len(rd.raw)
127+
return -1, ErrInsufficientData
128+
}
129+
tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:])))
130+
rd.off += 4
131+
if tmp > rd.remaining() {
132+
rd.off = len(rd.raw)
133+
return -1, ErrInsufficientData
134+
}
135+
return tmp, nil
136+
}
137+
124138
func (rd *realDecoder) getCompactArrayLength() (int, error) {
125139
n, err := rd.getUVarint()
126140
if err != nil {

record_batch.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,11 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
157157
return err
158158
}
159159

160-
numRecs, err := pd.getArrayLength()
160+
// Using NoLimit because a single record batch could contain
161+
// more then 2*math.MaxUint16 records. The packet decoder will
162+
// check to make sure the array is not greater than the
163+
// remaining bytes.
164+
numRecs, err := pd.getArrayLengthNoLimit()
161165
if err != nil {
162166
return err
163167
}

record_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package sarama
44

55
import (
6+
"fmt"
67
"reflect"
78
"testing"
89
"time"
@@ -254,3 +255,40 @@ func TestRecordBatchDecoding(t *testing.T) {
254255
}
255256
}
256257
}
258+
259+
func TestRecordBatchInvalidNumRecords(t *testing.T) {
260+
encodedBatch := []byte{
261+
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
262+
0, 0, 0, 70, // Length
263+
0, 0, 0, 0, // Partition Leader Epoch
264+
2, // Version
265+
91, 48, 202, 99, // CRC
266+
0, 0, // Attributes
267+
0, 0, 0, 0, // Last Offset Delta
268+
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
269+
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
270+
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
271+
0, 0, // Producer Epoch
272+
0, 0, 0, 0, // First Sequence
273+
0, 1, 255, 255, // Number of Records - 1 + 2*math.MaxUint16
274+
40, // Record Length
275+
0, // Attributes
276+
10, // Timestamp Delta
277+
0, // Offset Delta
278+
8, // Key Length
279+
1, 2, 3, 4,
280+
6, // Value Length
281+
5, 6, 7,
282+
2, // Number of Headers
283+
6, // Header Key Length
284+
8, 9, 10, // Header Key
285+
4, // Header Value Length
286+
11, 12, // Header Value
287+
}
288+
289+
batch := RecordBatch{}
290+
err := decode(encodedBatch, &batch, nil)
291+
if err != ErrInsufficientData {
292+
t.Fatal(fmt.Errorf("was suppose to get ErrInsufficientData, instead got: %w", err))
293+
}
294+
}

0 commit comments

Comments
 (0)