Skip to content

Commit a194c39

Browse files
committed
make decoding unknown message versions error
If sarama encounters an unknown message version somehow (e.g. due to a bug in Kafka 0.11's downconversion), right now sarama will carry on decoding the message, resulting in much harder to understand errors. Instead, bail out early with a friendly error message.
1 parent 55b3f6a commit a194c39

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

message.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
128128
}
129129
m.Codec = CompressionCodec(attribute & compressionCodecMask)
130130

131-
if m.Version >= 1 {
131+
if m.Version == 1 {
132132
millis, err := pd.getInt64()
133133
if err != nil {
134134
return err
@@ -144,6 +144,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
144144
m.Timestamp = timestamp
145145
}
146146

147+
if m.Version > 1 {
148+
return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
149+
}
150+
147151
m.Key, err = pd.getBytes()
148152
if err != nil {
149153
return err

message_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,21 @@ var (
1414
0xFF, 0xFF, 0xFF, 0xFF, // key
1515
0xFF, 0xFF, 0xFF, 0xFF} // value
1616

17+
emptyV1Message = []byte{
18+
167, 236, 104, 3, // CRC
19+
0x01, // magic version byte
20+
0x00, // attribute flags
21+
0x00, 0x00, 0x00, 0x00, // timestamp
22+
0xFF, 0xFF, 0xFF, 0xFF, // key
23+
0xFF, 0xFF, 0xFF, 0xFF} // value
24+
25+
emptyV2Message = []byte{
26+
167, 236, 104, 3, // CRC
27+
0x02, // magic version byte
28+
0x00, // attribute flags
29+
0xFF, 0xFF, 0xFF, 0xFF, // key
30+
0xFF, 0xFF, 0xFF, 0xFF} // value
31+
1732
emptyGzipMessage = []byte{
1833
97, 79, 149, 90, //CRC
1934
0x00, // magic version byte
@@ -179,3 +194,19 @@ func TestMessageDecodingBulkLZ4(t *testing.T) {
179194
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
180195
}
181196
}
197+
198+
func TestMessageDecodingVersion1(t *testing.T) {
199+
message := Message{Version: 1}
200+
testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
201+
}
202+
203+
func TestMessageDecodingUnknownVersions(t *testing.T) {
204+
message := Message{Version: 2}
205+
err := decode(emptyV2Message, &message)
206+
if err == nil {
207+
t.Error("Decoding did not produce an error for an unknown magic byte")
208+
}
209+
if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
210+
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
211+
}
212+
}

0 commit comments

Comments
 (0)