@@ -5,11 +5,27 @@ import (
5
5
"time"
6
6
)
7
7
8
+ // Protocol, http://kafka.apache.org/protocol.html
9
+ // v1
10
+ // v2 = v3 = v4
11
+ // v5 = v6 = v7
12
+ // Produce Response (Version: 7) => [responses] throttle_time_ms
13
+ // responses => topic [partition_responses]
14
+ // topic => STRING
15
+ // partition_responses => partition error_code base_offset log_append_time log_start_offset
16
+ // partition => INT32
17
+ // error_code => INT16
18
+ // base_offset => INT64
19
+ // log_append_time => INT64
20
+ // log_start_offset => INT64
21
+ // throttle_time_ms => INT32
22
+
23
+ // partition_responses in protocol
8
24
type ProduceResponseBlock struct {
9
- Err KError
10
- Offset int64
11
- // only provided if Version >= 2 and the broker is configured with `LogAppendTime`
12
- Timestamp time. Time
25
+ Err KError // v0, error_code
26
+ Offset int64 // v0, base_offset
27
+ Timestamp time. Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
28
+ StartOffset int64 // v5, log_start_offset
13
29
}
14
30
15
31
func (b * ProduceResponseBlock ) decode (pd packetDecoder , version int16 ) (err error ) {
@@ -32,6 +48,13 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
32
48
}
33
49
}
34
50
51
+ if version >= 5 {
52
+ b .StartOffset , err = pd .getInt64 ()
53
+ if err != nil {
54
+ return err
55
+ }
56
+ }
57
+
35
58
return nil
36
59
}
37
60
@@ -49,14 +72,17 @@ func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err erro
49
72
pe .putInt64 (timestamp )
50
73
}
51
74
75
+ if version >= 5 {
76
+ pe .putInt64 (b .StartOffset )
77
+ }
78
+
52
79
return nil
53
80
}
54
81
55
82
type ProduceResponse struct {
56
- Blocks map [string ]map [int32 ]* ProduceResponseBlock
83
+ Blocks map [string ]map [int32 ]* ProduceResponseBlock // v0, responses
57
84
Version int16
58
- ThrottleTime time.Duration // only provided if Version >= 1
59
- StartOffset int64 // only provided if Version >= 5
85
+ ThrottleTime time.Duration // v1, throttle_time_ms
60
86
}
61
87
62
88
func (r * ProduceResponse ) decode (pd packetDecoder , version int16 ) (err error ) {
@@ -96,13 +122,6 @@ func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
96
122
}
97
123
}
98
124
99
- if version >= 5 {
100
- r .StartOffset , err = pd .getInt64 ()
101
- if err != nil {
102
- return err
103
- }
104
- }
105
-
106
125
if r .Version >= 1 {
107
126
millis , err := pd .getInt32 ()
108
127
if err != nil {
@@ -137,9 +156,6 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
137
156
}
138
157
}
139
158
}
140
- if r .Version >= 5 {
141
- pe .putInt64 (r .StartOffset )
142
- }
143
159
144
160
if r .Version >= 1 {
145
161
pe .putInt32 (int32 (r .ThrottleTime / time .Millisecond ))
0 commit comments