Skip to content

Commit 0eb6982

Browse files
committed
make decoding unknown message versions error
If sarama encounters an unknown message version somehow (e.g. due to a bug in Kafka 0.11's downconversion), right now sarama will carry on decoding the message, resulting in much harder to understand errors. Instead, bail out early with a friendly error message.
1 parent 55b3f6a commit 0eb6982

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

message.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,17 @@ func (m *Message) decode(pd packetDecoder) (err error) {
122122
return err
123123
}
124124

125+
if m.Version > 1 {
126+
return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
127+
}
128+
125129
attribute, err := pd.getInt8()
126130
if err != nil {
127131
return err
128132
}
129133
m.Codec = CompressionCodec(attribute & compressionCodecMask)
130134

131-
if m.Version >= 1 {
135+
if m.Version == 1 {
132136
millis, err := pd.getInt64()
133137
if err != nil {
134138
return err

message_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,21 @@ var (
1414
0xFF, 0xFF, 0xFF, 0xFF, // key
1515
0xFF, 0xFF, 0xFF, 0xFF} // value
1616

17+
emptyV1Message = []byte{
18+
167, 236, 104, 3, // CRC
19+
0x01, // magic version byte
20+
0x00, // attribute flags
21+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
22+
0xFF, 0xFF, 0xFF, 0xFF, // key
23+
0xFF, 0xFF, 0xFF, 0xFF} // value
24+
25+
emptyV2Message = []byte{
26+
167, 236, 104, 3, // CRC
27+
0x02, // magic version byte
28+
0x00, // attribute flags
29+
0xFF, 0xFF, 0xFF, 0xFF, // key
30+
0xFF, 0xFF, 0xFF, 0xFF} // value
31+
1732
emptyGzipMessage = []byte{
1833
97, 79, 149, 90, //CRC
1934
0x00, // magic version byte
@@ -179,3 +194,19 @@ func TestMessageDecodingBulkLZ4(t *testing.T) {
179194
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
180195
}
181196
}
197+
198+
func TestMessageDecodingVersion1(t *testing.T) {
199+
message := Message{Version: 1}
200+
testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
201+
}
202+
203+
func TestMessageDecodingUnknownVersions(t *testing.T) {
204+
message := Message{Version: 2}
205+
err := decode(emptyV2Message, &message)
206+
if err == nil {
207+
t.Error("Decoding did not produce an error for an unknown magic byte")
208+
}
209+
if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
210+
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
211+
}
212+
}

0 commit comments

Comments
 (0)