Skip to content

Commit 30e7094

Browse files
authored
Merge pull request #1574 from Shopify/diego_zstd-support-kafka-2_1_0_0
Enables zstd (for real this time)
2 parents 7629590 + 37faed7 commit 30e7094

9 files changed

+115
-28
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
4+
#### Unreleased
5+
6+
Improvements:
7+
- Enable zstd compression
8+
([1574](https://github.com/Shopify/sarama/pull/1574),
9+
[1582](https://github.com/Shopify/sarama/pull/1582))
10+
311
#### Version 1.25.0 (2020-01-13)
412

513
New Features:

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,10 @@ func (c *Config) Validate() error {
636636
}
637637
}
638638

639+
if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
640+
return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
641+
}
642+
639643
if c.Producer.Idempotent {
640644
if !c.Version.IsAtLeast(V0_11_0_0) {
641645
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")

config_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,18 @@ func TestLZ4ConfigValidation(t *testing.T) {
405405
}
406406
}
407407

408+
func TestZstdConfigValidation(t *testing.T) {
409+
config := NewConfig()
410+
config.Producer.Compression = CompressionZSTD
411+
if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" {
412+
t.Error("Expected invalid zstd/kafka version error, got ", err)
413+
}
414+
config.Version = V2_1_0_0
415+
if err := config.Validate(); err != nil {
416+
t.Error("Expected zstd to work, got ", err)
417+
}
418+
}
419+
408420
// This example shows how to integrate with an existing registry as well as publishing metrics
409421
// on the standard output
410422
func ExampleConfig_metrics() {

consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
888888
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
889889
}
890890

891+
if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
892+
request.Version = 10
893+
}
894+
891895
for child := range bc.subscriptions {
892896
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
893897
}

functional_consumer_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,21 @@ func TestVersionMatrixLZ4(t *testing.T) {
106106
consumeMsgs(t, testVersions, producedMessages)
107107
}
108108

109+
// Support for zstd codec was introduced in v2.1.0.0
110+
func TestVersionMatrixZstd(t *testing.T) {
111+
setupFunctionalTest(t)
112+
defer teardownFunctionalTest(t)
113+
114+
// Produce lot's of message with all possible combinations of supported
115+
// protocol versions starting with v2.1.0.0 (first where zstd was supported)
116+
testVersions := versionRange(V2_1_0_0)
117+
allCodecs := []CompressionCodec{CompressionZSTD}
118+
producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
119+
120+
// When/Then
121+
consumeMsgs(t, testVersions, producedMessages)
122+
}
123+
109124
func TestVersionMatrixIdempotent(t *testing.T) {
110125
setupFunctionalTest(t)
111126
defer teardownFunctionalTest(t)

produce_request.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
214214
return V0_10_0_0
215215
case 3:
216216
return V0_11_0_0
217+
case 7:
218+
return V2_1_0_0
217219
default:
218220
return MinVersion
219221
}

produce_response.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,27 @@ import (
55
"time"
66
)
77

8+
// Protocol, http://kafka.apache.org/protocol.html
9+
// v1
10+
// v2 = v3 = v4
11+
// v5 = v6 = v7
12+
// Produce Response (Version: 7) => [responses] throttle_time_ms
13+
// responses => topic [partition_responses]
14+
// topic => STRING
15+
// partition_responses => partition error_code base_offset log_append_time log_start_offset
16+
// partition => INT32
17+
// error_code => INT16
18+
// base_offset => INT64
19+
// log_append_time => INT64
20+
// log_start_offset => INT64
21+
// throttle_time_ms => INT32
22+
23+
// partition_responses in protocol
824
type ProduceResponseBlock struct {
9-
Err KError
10-
Offset int64
11-
// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
12-
Timestamp time.Time
25+
Err KError // v0, error_code
26+
Offset int64 // v0, base_offset
27+
Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
28+
StartOffset int64 // v5, log_start_offset
1329
}
1430

1531
func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -32,6 +48,13 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
3248
}
3349
}
3450

51+
if version >= 5 {
52+
b.StartOffset, err = pd.getInt64()
53+
if err != nil {
54+
return err
55+
}
56+
}
57+
3558
return nil
3659
}
3760

@@ -49,13 +72,17 @@ func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err erro
4972
pe.putInt64(timestamp)
5073
}
5174

75+
if version >= 5 {
76+
pe.putInt64(b.StartOffset)
77+
}
78+
5279
return nil
5380
}
5481

5582
type ProduceResponse struct {
56-
Blocks map[string]map[int32]*ProduceResponseBlock
83+
Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
5784
Version int16
58-
ThrottleTime time.Duration // only provided if Version >= 1
85+
ThrottleTime time.Duration // v1, throttle_time_ms
5986
}
6087

6188
func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -129,6 +156,7 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
129156
}
130157
}
131158
}
159+
132160
if r.Version >= 1 {
133161
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
134162
}
@@ -143,19 +171,6 @@ func (r *ProduceResponse) version() int16 {
143171
return r.Version
144172
}
145173

146-
func (r *ProduceResponse) requiredVersion() KafkaVersion {
147-
switch r.Version {
148-
case 1:
149-
return V0_9_0_0
150-
case 2:
151-
return V0_10_0_0
152-
case 3:
153-
return V0_11_0_0
154-
default:
155-
return MinVersion
156-
}
157-
}
158-
159174
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
160175
if r.Blocks == nil {
161176
return nil

produce_response_test.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ var (
1010
produceResponseNoBlocksV0 = []byte{
1111
0x00, 0x00, 0x00, 0x00}
1212

13-
produceResponseManyBlocksVersions = [][]byte{
14-
{
13+
produceResponseManyBlocksVersions = map[int][]byte{
14+
0: {
1515
0x00, 0x00, 0x00, 0x01,
1616

1717
0x00, 0x03, 'f', 'o', 'o',
@@ -20,7 +20,9 @@ var (
2020
0x00, 0x00, 0x00, 0x01, // Partition 1
2121
0x00, 0x02, // ErrInvalidMessage
2222
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
23-
}, {
23+
},
24+
25+
1: {
2426
0x00, 0x00, 0x00, 0x01,
2527

2628
0x00, 0x03, 'f', 'o', 'o',
@@ -31,7 +33,8 @@ var (
3133
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
3234

3335
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
34-
}, {
36+
},
37+
2: {
3538
0x00, 0x00, 0x00, 0x01,
3639

3740
0x00, 0x03, 'f', 'o', 'o',
@@ -42,6 +45,20 @@ var (
4245
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
4346
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
4447

48+
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
49+
},
50+
7: { // version 7 adds StartOffset
51+
0x00, 0x00, 0x00, 0x01,
52+
53+
0x00, 0x03, 'f', 'o', 'o',
54+
0x00, 0x00, 0x00, 0x01,
55+
56+
0x00, 0x00, 0x00, 0x01, // Partition 1
57+
0x00, 0x02, // ErrInvalidMessage
58+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
59+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
60+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50
61+
4562
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
4663
},
4764
}
@@ -69,14 +86,19 @@ func TestProduceResponseDecode(t *testing.T) {
6986
t.Error("Decoding did not produce a block for foo/1")
7087
} else {
7188
if block.Err != ErrInvalidMessage {
72-
t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
89+
t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
7390
}
7491
if block.Offset != 255 {
7592
t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
7693
}
7794
if v >= 2 {
7895
if block.Timestamp != time.Unix(1, 0) {
79-
t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
96+
t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
97+
}
98+
}
99+
if v >= 7 {
100+
if block.StartOffset != 50 {
101+
t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset)
80102
}
81103
}
82104
}
@@ -95,9 +117,10 @@ func TestProduceResponseEncode(t *testing.T) {
95117

96118
response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
97119
response.Blocks["foo"][1] = &ProduceResponseBlock{
98-
Err: ErrInvalidMessage,
99-
Offset: 255,
100-
Timestamp: time.Unix(1, 0),
120+
Err: ErrInvalidMessage,
121+
Offset: 255,
122+
Timestamp: time.Unix(1, 0),
123+
StartOffset: 50,
101124
}
102125
response.ThrottleTime = 100 * time.Millisecond
103126
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {

produce_set.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
129129
req.Version = 3
130130
}
131131

132+
if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
133+
req.Version = 7
134+
}
135+
132136
for topic, partitionSets := range ps.msgs {
133137
for partition, set := range partitionSets {
134138
if req.Version >= 3 {

0 commit comments

Comments
 (0)