Skip to content

Commit 2fcb056

Browse files
authored
Merge pull request #940 from tcrayford/error_if_message_magic_byte_is_unknown
make decoding unknown message versions error
2 parents 55b3f6a + f5ef216 commit 2fcb056

File tree

3 files changed

+40
-3
lines changed

3 files changed

+40
-3
lines changed

crc32_field.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sarama
22

33
import (
44
"encoding/binary"
5+
"fmt"
56
"hash/crc32"
67
)
78

@@ -27,8 +28,9 @@ func (c *crc32Field) run(curOffset int, buf []byte) error {
2728
func (c *crc32Field) check(curOffset int, buf []byte) error {
2829
crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
2930

30-
if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
31-
return PacketDecodingError{"CRC didn't match"}
31+
expected := binary.BigEndian.Uint32(buf[c.startOffset:])
32+
if crc != expected {
33+
return PacketDecodingError{fmt.Sprintf("CRC didn't match expected %#x got %#x", expected, crc)}
3234
}
3335

3436
return nil

message.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,17 @@ func (m *Message) decode(pd packetDecoder) (err error) {
122122
return err
123123
}
124124

125+
if m.Version > 1 {
126+
return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
127+
}
128+
125129
attribute, err := pd.getInt8()
126130
if err != nil {
127131
return err
128132
}
129133
m.Codec = CompressionCodec(attribute & compressionCodecMask)
130134

131-
if m.Version >= 1 {
135+
if m.Version == 1 {
132136
millis, err := pd.getInt64()
133137
if err != nil {
134138
return err

message_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,21 @@ var (
1414
0xFF, 0xFF, 0xFF, 0xFF, // key
1515
0xFF, 0xFF, 0xFF, 0xFF} // value
1616

17+
emptyV1Message = []byte{
18+
204, 47, 121, 217, // CRC
19+
0x01, // magic version byte
20+
0x00, // attribute flags
21+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
22+
0xFF, 0xFF, 0xFF, 0xFF, // key
23+
0xFF, 0xFF, 0xFF, 0xFF} // value
24+
25+
emptyV2Message = []byte{
26+
167, 236, 104, 3, // CRC
27+
0x02, // magic version byte
28+
0x00, // attribute flags
29+
0xFF, 0xFF, 0xFF, 0xFF, // key
30+
0xFF, 0xFF, 0xFF, 0xFF} // value
31+
1732
emptyGzipMessage = []byte{
1833
97, 79, 149, 90, //CRC
1934
0x00, // magic version byte
@@ -179,3 +194,19 @@ func TestMessageDecodingBulkLZ4(t *testing.T) {
179194
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
180195
}
181196
}
197+
198+
func TestMessageDecodingVersion1(t *testing.T) {
199+
message := Message{Version: 1}
200+
testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
201+
}
202+
203+
func TestMessageDecodingUnknownVersions(t *testing.T) {
204+
message := Message{Version: 2}
205+
err := decode(emptyV2Message, &message)
206+
if err == nil {
207+
t.Error("Decoding did not produce an error for an unknown magic byte")
208+
}
209+
if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
210+
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
211+
}
212+
}

0 commit comments

Comments
 (0)