Skip to content

Commit 37faed7

Browse files
committed
enables zstd bumping the right things
Zstd support was initially added to sarama before https://issues.apache.org/jira/browse/KAFKA-4514 https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression was done. The final release added some changes like bumping the produce and fetch requests to only allow new clients to use zstd. This PR tries to do that, however, there are some other protocol changes that are not addressed on this PR, and I'm not sure what would be the effect of bumping the produce and fetch requests without filling the protocol gaps.
1 parent ab4036c commit 37faed7

9 files changed

+115
-28
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
4+
#### Unreleased
5+
6+
Improvements:
7+
- Enable zstd compression
8+
([1574](https://github.com/Shopify/sarama/pull/1574),
9+
[1582](https://github.com/Shopify/sarama/pull/1582))
10+
311
#### Version 1.25.0 (2020-01-13)
412

513
New Features:

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ func (c *Config) Validate() error {
629629
}
630630
}
631631

632+
if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
633+
return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
634+
}
635+
632636
if c.Producer.Idempotent {
633637
if !c.Version.IsAtLeast(V0_11_0_0) {
634638
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")

config_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,18 @@ func TestLZ4ConfigValidation(t *testing.T) {
405405
}
406406
}
407407

408+
func TestZstdConfigValidation(t *testing.T) {
409+
config := NewConfig()
410+
config.Producer.Compression = CompressionZSTD
411+
if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" {
412+
t.Error("Expected invalid zstd/kafka version error, got ", err)
413+
}
414+
config.Version = V2_1_0_0
415+
if err := config.Validate(); err != nil {
416+
t.Error("Expected zstd to work, got ", err)
417+
}
418+
}
419+
408420
// This example shows how to integrate with an existing registry as well as publishing metrics
409421
// on the standard output
410422
func ExampleConfig_metrics() {

consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
888888
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
889889
}
890890

891+
if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
892+
request.Version = 10
893+
}
894+
891895
for child := range bc.subscriptions {
892896
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
893897
}

functional_consumer_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,21 @@ func TestVersionMatrixLZ4(t *testing.T) {
106106
consumeMsgs(t, testVersions, producedMessages)
107107
}
108108

109+
// Support for zstd codec was introduced in v2.1.0.0
110+
func TestVersionMatrixZstd(t *testing.T) {
111+
setupFunctionalTest(t)
112+
defer teardownFunctionalTest(t)
113+
114+
// Produce lot's of message with all possible combinations of supported
115+
// protocol versions starting with v2.1.0.0 (first where zstd was supported)
116+
testVersions := versionRange(V2_1_0_0)
117+
allCodecs := []CompressionCodec{CompressionZSTD}
118+
producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
119+
120+
// When/Then
121+
consumeMsgs(t, testVersions, producedMessages)
122+
}
123+
109124
func TestVersionMatrixIdempotent(t *testing.T) {
110125
setupFunctionalTest(t)
111126
defer teardownFunctionalTest(t)

produce_request.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
214214
return V0_10_0_0
215215
case 3:
216216
return V0_11_0_0
217+
case 7:
218+
return V2_1_0_0
217219
default:
218220
return MinVersion
219221
}

produce_response.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,27 @@ import (
55
"time"
66
)
77

8+
// Protocol, http://kafka.apache.org/protocol.html
9+
// v1
10+
// v2 = v3 = v4
11+
// v5 = v6 = v7
12+
// Produce Response (Version: 7) => [responses] throttle_time_ms
13+
// responses => topic [partition_responses]
14+
// topic => STRING
15+
// partition_responses => partition error_code base_offset log_append_time log_start_offset
16+
// partition => INT32
17+
// error_code => INT16
18+
// base_offset => INT64
19+
// log_append_time => INT64
20+
// log_start_offset => INT64
21+
// throttle_time_ms => INT32
22+
23+
// partition_responses in protocol
824
type ProduceResponseBlock struct {
9-
Err KError
10-
Offset int64
11-
// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
12-
Timestamp time.Time
25+
Err KError // v0, error_code
26+
Offset int64 // v0, base_offset
27+
Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
28+
StartOffset int64 // v5, log_start_offset
1329
}
1430

1531
func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -32,6 +48,13 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
3248
}
3349
}
3450

51+
if version >= 5 {
52+
b.StartOffset, err = pd.getInt64()
53+
if err != nil {
54+
return err
55+
}
56+
}
57+
3558
return nil
3659
}
3760

@@ -49,13 +72,17 @@ func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err erro
4972
pe.putInt64(timestamp)
5073
}
5174

75+
if version >= 5 {
76+
pe.putInt64(b.StartOffset)
77+
}
78+
5279
return nil
5380
}
5481

5582
type ProduceResponse struct {
56-
Blocks map[string]map[int32]*ProduceResponseBlock
83+
Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
5784
Version int16
58-
ThrottleTime time.Duration // only provided if Version >= 1
85+
ThrottleTime time.Duration // v1, throttle_time_ms
5986
}
6087

6188
func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -129,6 +156,7 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
129156
}
130157
}
131158
}
159+
132160
if r.Version >= 1 {
133161
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
134162
}
@@ -143,19 +171,6 @@ func (r *ProduceResponse) version() int16 {
143171
return r.Version
144172
}
145173

146-
func (r *ProduceResponse) requiredVersion() KafkaVersion {
147-
switch r.Version {
148-
case 1:
149-
return V0_9_0_0
150-
case 2:
151-
return V0_10_0_0
152-
case 3:
153-
return V0_11_0_0
154-
default:
155-
return MinVersion
156-
}
157-
}
158-
159174
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
160175
if r.Blocks == nil {
161176
return nil

produce_response_test.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ var (
1010
produceResponseNoBlocksV0 = []byte{
1111
0x00, 0x00, 0x00, 0x00}
1212

13-
produceResponseManyBlocksVersions = [][]byte{
14-
{
13+
produceResponseManyBlocksVersions = map[int][]byte{
14+
0: {
1515
0x00, 0x00, 0x00, 0x01,
1616

1717
0x00, 0x03, 'f', 'o', 'o',
@@ -20,7 +20,9 @@ var (
2020
0x00, 0x00, 0x00, 0x01, // Partition 1
2121
0x00, 0x02, // ErrInvalidMessage
2222
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
23-
}, {
23+
},
24+
25+
1: {
2426
0x00, 0x00, 0x00, 0x01,
2527

2628
0x00, 0x03, 'f', 'o', 'o',
@@ -31,7 +33,8 @@ var (
3133
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
3234

3335
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
34-
}, {
36+
},
37+
2: {
3538
0x00, 0x00, 0x00, 0x01,
3639

3740
0x00, 0x03, 'f', 'o', 'o',
@@ -42,6 +45,20 @@ var (
4245
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
4346
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
4447

48+
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
49+
},
50+
7: { // version 7 adds StartOffset
51+
0x00, 0x00, 0x00, 0x01,
52+
53+
0x00, 0x03, 'f', 'o', 'o',
54+
0x00, 0x00, 0x00, 0x01,
55+
56+
0x00, 0x00, 0x00, 0x01, // Partition 1
57+
0x00, 0x02, // ErrInvalidMessage
58+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
59+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
60+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50
61+
4562
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
4663
},
4764
}
@@ -69,14 +86,19 @@ func TestProduceResponseDecode(t *testing.T) {
6986
t.Error("Decoding did not produce a block for foo/1")
7087
} else {
7188
if block.Err != ErrInvalidMessage {
72-
t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
89+
t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
7390
}
7491
if block.Offset != 255 {
7592
t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
7693
}
7794
if v >= 2 {
7895
if block.Timestamp != time.Unix(1, 0) {
79-
t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
96+
t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
97+
}
98+
}
99+
if v >= 7 {
100+
if block.StartOffset != 50 {
101+
t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset)
80102
}
81103
}
82104
}
@@ -95,9 +117,10 @@ func TestProduceResponseEncode(t *testing.T) {
95117

96118
response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
97119
response.Blocks["foo"][1] = &ProduceResponseBlock{
98-
Err: ErrInvalidMessage,
99-
Offset: 255,
100-
Timestamp: time.Unix(1, 0),
120+
Err: ErrInvalidMessage,
121+
Offset: 255,
122+
Timestamp: time.Unix(1, 0),
123+
StartOffset: 50,
101124
}
102125
response.ThrottleTime = 100 * time.Millisecond
103126
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {

produce_set.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
129129
req.Version = 3
130130
}
131131

132+
if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
133+
req.Version = 7
134+
}
135+
132136
for topic, partitionSets := range ps.msgs {
133137
for partition, set := range partitionSets {
134138
if req.Version >= 3 {

0 commit comments

Comments
 (0)