Skip to content

Commit ca55d05

Browse files
author
Sergey Zimin
committed
Consumer timestamp test adjusted
1 parent e666501 commit ca55d05

File tree

3 files changed

+36
-17
lines changed

3 files changed

+36
-17
lines changed

consumer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
795795
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
796796
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
797797
}
798+
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
799+
request.Version = 1
800+
}
798801
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
799802
request.Version = 2
800803
}

consumer_test.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,15 @@ func TestConsumerTimestamps(t *testing.T) {
999999
messages []testMessage
10001000
expectedTimestamp []time.Time
10011001
}{
1002-
{V0_8_2_0, false, []testMessage{
1002+
{MinVersion, false, []testMessage{
1003+
{nil, testMsg, 1, now},
1004+
{nil, testMsg, 2, now},
1005+
}, []time.Time{{}, {}}},
1006+
{V0_9_0_0, false, []testMessage{
1007+
{nil, testMsg, 1, now},
1008+
{nil, testMsg, 2, now},
1009+
}, []time.Time{{}, {}}},
1010+
{V0_10_0_0, false, []testMessage{
10031011
{nil, testMsg, 1, now},
10041012
{nil, testMsg, 2, now},
10051013
}, []time.Time{{}, {}}},
@@ -1024,26 +1032,33 @@ func TestConsumerTimestamps(t *testing.T) {
10241032
var offsetResponseVersion int16
10251033
cfg := NewConfig()
10261034
cfg.Version = d.kversion
1027-
switch d.kversion {
1028-
default:
1029-
fr = &FetchResponse{}
1035+
switch {
1036+
case d.kversion.IsAtLeast(V0_11_0_0):
1037+
offsetResponseVersion = 1
1038+
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
10301039
for _, m := range d.messages {
1031-
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
1040+
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
10321041
}
1033-
case V0_10_2_1:
1042+
fr.SetLastOffsetDelta("my_topic", 0, 2)
1043+
fr.SetLastStableOffset("my_topic", 0, 2)
1044+
case d.kversion.IsAtLeast(V0_10_1_0):
10341045
offsetResponseVersion = 1
10351046
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
10361047
for _, m := range d.messages {
10371048
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
10381049
}
1039-
case V0_11_0_0:
1040-
offsetResponseVersion = 1
1041-
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
1050+
default:
1051+
var version int16
1052+
switch {
1053+
case d.kversion.IsAtLeast(V0_10_0_0):
1054+
version = 2
1055+
case d.kversion.IsAtLeast(V0_9_0_0):
1056+
version = 1
1057+
}
1058+
fr = &FetchResponse{Version: version}
10421059
for _, m := range d.messages {
1043-
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
1060+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
10441061
}
1045-
fr.SetLastOffsetDelta("my_topic", 0, 2)
1046-
fr.SetLastStableOffset("my_topic", 0, 2)
10471062
}
10481063

10491064
broker0 := NewMockBroker(t, 0)

message.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"time"
66
)
77

8-
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
9-
type CompressionCodec int8
10-
118
// The lowest 3 bits contain the compression codec used for the message
129
const compressionCodecMask int8 = 0x07
1310

11+
// Bit 3 set for "LogAppend" timestamps
12+
const timestampTypeMask = 0x08
13+
14+
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
15+
type CompressionCodec int8
16+
1417
const (
1518
CompressionNone CompressionCodec = 0
1619
CompressionGZIP CompressionCodec = 1
@@ -19,8 +22,6 @@ const (
1922
CompressionZSTD CompressionCodec = 4
2023
)
2124

22-
const timestampTypeMask = 0x08
23-
2425
func (cc CompressionCodec) String() string {
2526
return []string{
2627
"none",

0 commit comments

Comments
 (0)