-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(decoder): use configurable limit for max number of records in a record batch #3120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ | |
package sarama | ||
|
||
import ( | ||
"encoding/binary" | ||
"math" | ||
"reflect" | ||
"testing" | ||
"time" | ||
|
@@ -254,3 +256,50 @@ func TestRecordBatchDecoding(t *testing.T) { | |
} | ||
} | ||
} | ||
|
||
func TestRecordBatchLargeNumRecords(t *testing.T) { | ||
numOfRecords := 10 + (2 * math.MaxUint16) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure how useful making this parameterizable is, when the CRC will change if the value changes, but then I like that it’s apparent and inarguable what length we’re encoding. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I only set it as a var to make it obvious what it is. I kept forgetting what I had it set to during testing, so it's probably useful just to have it easily readable like you said haha. |
||
numofRecordsBytes := make([]byte, 4) | ||
binary.BigEndian.PutUint32(numofRecordsBytes, uint32(numOfRecords)) | ||
|
||
encodedBatch := []byte{ | ||
0, 0, 0, 0, 0, 0, 0, 0, // First Offset | ||
0, 42, 0, 250, // Length | ||
0, 0, 0, 0, // Partition Leader Epoch | ||
2, // Version | ||
103, 68, 166, 213, // CRC | ||
0, 0, // Attributes | ||
0, 0, 0, 0, // Last Offset Delta | ||
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp | ||
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp | ||
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID | ||
0, 0, // Producer Epoch | ||
0, 0, 0, 0, // First Sequence | ||
} | ||
|
||
encodedBatch = append(encodedBatch, numofRecordsBytes...) | ||
|
||
for range numOfRecords { | ||
encodedBatch = append(encodedBatch, []byte{ | ||
40, // Record Length | ||
0, // Attributes | ||
10, // Timestamp Delta | ||
0, // Offset Delta | ||
8, // Key Length | ||
1, 2, 3, 4, | ||
6, // Value Length | ||
5, 6, 7, | ||
2, // Number of Headers | ||
6, // Header Key Length | ||
8, 9, 10, // Header Key | ||
4, // Header Value Length | ||
11, 12, // Header Value | ||
}...) | ||
} | ||
|
||
var batch RecordBatch | ||
err := decode(encodedBatch, &batch, nil) | ||
if err != nil { | ||
t.Fatal("received error while decoding record batch", err) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.