|
3 | 3 | package sarama
|
4 | 4 |
|
5 | 5 | import (
|
6 |
| - "fmt" |
| 6 | + "encoding/binary" |
| 7 | + "math" |
7 | 8 | "reflect"
|
8 | 9 | "testing"
|
9 | 10 | "time"
|
@@ -256,39 +257,49 @@ func TestRecordBatchDecoding(t *testing.T) {
|
256 | 257 | }
|
257 | 258 | }
|
258 | 259 |
|
259 |
| -func TestRecordBatchInvalidNumRecords(t *testing.T) { |
| 260 | +func TestRecordBatchLargeNumRecords(t *testing.T) { |
| 261 | + numOfRecords := 10 + (2 * math.MaxUint16) |
| 262 | + numofRecordsBytes := make([]byte, 4) |
| 263 | + binary.BigEndian.PutUint32(numofRecordsBytes, uint32(numOfRecords)) |
| 264 | + |
260 | 265 | encodedBatch := []byte{
|
261 | 266 | 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
|
262 |
| - 0, 0, 0, 70, // Length |
| 267 | + 0, 42, 0, 250, // Length |
263 | 268 | 0, 0, 0, 0, // Partition Leader Epoch
|
264 |
| - 2, // Version |
265 |
| - 91, 48, 202, 99, // CRC |
| 269 | + 2, // Version |
| 270 | + 103, 68, 166, 213, // CRC |
266 | 271 | 0, 0, // Attributes
|
267 | 272 | 0, 0, 0, 0, // Last Offset Delta
|
268 | 273 | 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
|
269 | 274 | 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
|
270 | 275 | 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
|
271 | 276 | 0, 0, // Producer Epoch
|
272 | 277 | 0, 0, 0, 0, // First Sequence
|
273 |
| - 0, 1, 255, 255, // Number of Records - 1 + 2*math.MaxUint16 |
274 |
| - 40, // Record Length |
275 |
| - 0, // Attributes |
276 |
| - 10, // Timestamp Delta |
277 |
| - 0, // Offset Delta |
278 |
| - 8, // Key Length |
279 |
| - 1, 2, 3, 4, |
280 |
| - 6, // Value Length |
281 |
| - 5, 6, 7, |
282 |
| - 2, // Number of Headers |
283 |
| - 6, // Header Key Length |
284 |
| - 8, 9, 10, // Header Key |
285 |
| - 4, // Header Value Length |
286 |
| - 11, 12, // Header Value |
287 | 278 | }
|
288 | 279 |
|
289 |
| - batch := RecordBatch{} |
| 280 | + encodedBatch = append(encodedBatch, numofRecordsBytes...) |
| 281 | + |
| 282 | + for range numOfRecords { |
| 283 | + encodedBatch = append(encodedBatch, []byte{ |
| 284 | + 40, // Record Length |
| 285 | + 0, // Attributes |
| 286 | + 10, // Timestamp Delta |
| 287 | + 0, // Offset Delta |
| 288 | + 8, // Key Length |
| 289 | + 1, 2, 3, 4, |
| 290 | + 6, // Value Length |
| 291 | + 5, 6, 7, |
| 292 | + 2, // Number of Headers |
| 293 | + 6, // Header Key Length |
| 294 | + 8, 9, 10, // Header Key |
| 295 | + 4, // Header Value Length |
| 296 | + 11, 12, // Header Value |
| 297 | + }...) |
| 298 | + } |
| 299 | + |
| 300 | + var batch RecordBatch |
290 | 301 | err := decode(encodedBatch, &batch, nil)
|
291 |
| - if err != ErrInsufficientData { |
292 |
| - t.Fatal(fmt.Errorf("was suppose to get ErrInsufficientData, instead got: %w", err)) |
| 302 | + if err != nil { |
| 303 | + t.Fatal("received error while decoding record batch", err) |
293 | 304 | }
|
294 | 305 | }
|
0 commit comments