Skip to content

Commit e1eda41

Browse files
author
Sergey Zimin
committed
Tests for timestamps added
1 parent f0e5fa7 commit e1eda41

File tree

5 files changed

+153
-13
lines changed

5 files changed

+153
-13
lines changed

consumer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
795795
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
796796
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
797797
}
798+
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
799+
request.Version = 1
800+
}
798801
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
799802
request.Version = 2
800803
}

consumer_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,123 @@ func TestConsumerExpiryTicker(t *testing.T) {
985985
broker0.Close()
986986
}
987987

988+
func TestConsumerTimestamps(t *testing.T) {
989+
now := time.Now().Truncate(time.Millisecond)
990+
type testMessage struct {
991+
key Encoder
992+
value Encoder
993+
offset int64
994+
timestamp time.Time
995+
}
996+
for _, d := range []struct {
997+
kversion KafkaVersion
998+
logAppendTime bool
999+
messages []testMessage
1000+
expectedTimestamp []time.Time
1001+
}{
1002+
{MinVersion, false, []testMessage{
1003+
{nil, testMsg, 1, now},
1004+
{nil, testMsg, 2, now},
1005+
}, []time.Time{{}, {}}},
1006+
{V0_9_0_0, false, []testMessage{
1007+
{nil, testMsg, 1, now},
1008+
{nil, testMsg, 2, now},
1009+
}, []time.Time{{}, {}}},
1010+
{V0_10_0_0, false, []testMessage{
1011+
{nil, testMsg, 1, now},
1012+
{nil, testMsg, 2, now},
1013+
}, []time.Time{{}, {}}},
1014+
{V0_10_2_1, false, []testMessage{
1015+
{nil, testMsg, 1, now.Add(time.Second)},
1016+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1017+
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1018+
{V0_10_2_1, true, []testMessage{
1019+
{nil, testMsg, 1, now.Add(time.Second)},
1020+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1021+
}, []time.Time{now, now}},
1022+
{V0_11_0_0, false, []testMessage{
1023+
{nil, testMsg, 1, now.Add(time.Second)},
1024+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1025+
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1026+
{V0_11_0_0, true, []testMessage{
1027+
{nil, testMsg, 1, now.Add(time.Second)},
1028+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1029+
}, []time.Time{now, now}},
1030+
} {
1031+
var fr *FetchResponse
1032+
var offsetResponseVersion int16
1033+
cfg := NewConfig()
1034+
cfg.Version = d.kversion
1035+
switch {
1036+
case d.kversion.IsAtLeast(V0_11_0_0):
1037+
offsetResponseVersion = 1
1038+
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
1039+
for _, m := range d.messages {
1040+
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
1041+
}
1042+
fr.SetLastOffsetDelta("my_topic", 0, 2)
1043+
fr.SetLastStableOffset("my_topic", 0, 2)
1044+
case d.kversion.IsAtLeast(V0_10_1_0):
1045+
offsetResponseVersion = 1
1046+
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
1047+
for _, m := range d.messages {
1048+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
1049+
}
1050+
default:
1051+
var version int16
1052+
switch {
1053+
case d.kversion.IsAtLeast(V0_10_0_0):
1054+
version = 2
1055+
case d.kversion.IsAtLeast(V0_9_0_0):
1056+
version = 1
1057+
}
1058+
fr = &FetchResponse{Version: version}
1059+
for _, m := range d.messages {
1060+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
1061+
}
1062+
}
1063+
1064+
broker0 := NewMockBroker(t, 0)
1065+
broker0.SetHandlerByMap(map[string]MockResponse{
1066+
"MetadataRequest": NewMockMetadataResponse(t).
1067+
SetBroker(broker0.Addr(), broker0.BrokerID()).
1068+
SetLeader("my_topic", 0, broker0.BrokerID()),
1069+
"OffsetRequest": NewMockOffsetResponse(t).
1070+
SetVersion(offsetResponseVersion).
1071+
SetOffset("my_topic", 0, OffsetNewest, 1234).
1072+
SetOffset("my_topic", 0, OffsetOldest, 0),
1073+
"FetchRequest": NewMockSequence(fr),
1074+
})
1075+
1076+
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
1077+
if err != nil {
1078+
t.Fatal(err)
1079+
}
1080+
1081+
consumer, err := master.ConsumePartition("my_topic", 0, 1)
1082+
if err != nil {
1083+
t.Fatal(err)
1084+
}
1085+
1086+
for i, ts := range d.expectedTimestamp {
1087+
select {
1088+
case msg := <-consumer.Messages():
1089+
assertMessageOffset(t, msg, int64(i)+1)
1090+
if msg.Timestamp != ts {
1091+
t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
1092+
d.kversion, d.logAppendTime, msg.Timestamp, ts)
1093+
}
1094+
case err := <-consumer.Errors():
1095+
t.Fatal(err)
1096+
}
1097+
}
1098+
1099+
safeClose(t, consumer)
1100+
safeClose(t, master)
1101+
broker0.Close()
1102+
}
1103+
}
1104+
9881105
func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
9891106
if msg.Offset != expectedOffset {
9901107
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

fetch_response.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
186186
}
187187

188188
type FetchResponse struct {
189-
Blocks map[string]map[int32]*FetchResponseBlock
190-
ThrottleTime time.Duration
191-
Version int16 // v1 requires 0.9+, v2 requires 0.10+
189+
Blocks map[string]map[int32]*FetchResponseBlock
190+
ThrottleTime time.Duration
191+
Version int16 // v1 requires 0.9+, v2 requires 0.10+
192+
LogAppendTime bool
193+
Timestamp time.Time
192194
}
193195

194196
func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) {
355357
return kb, vb
356358
}
357359

358-
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
360+
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
359361
frb := r.getOrCreateBlock(topic, partition)
360362
kb, vb := encodeKV(key, value)
361-
msg := &Message{Key: kb, Value: vb}
363+
if r.LogAppendTime {
364+
timestamp = r.Timestamp
365+
}
366+
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
362367
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
363368
if len(frb.RecordsSet) == 0 {
364369
records := newLegacyRecords(&MessageSet{})
@@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
368373
set.Messages = append(set.Messages, msgBlock)
369374
}
370375

371-
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
376+
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
372377
frb := r.getOrCreateBlock(topic, partition)
373378
kb, vb := encodeKV(key, value)
374-
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
375379
if len(frb.RecordsSet) == 0 {
376-
records := newDefaultRecords(&RecordBatch{Version: 2})
380+
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
377381
frb.RecordsSet = []*Records{&records}
378382
}
379383
batch := frb.RecordsSet[0].RecordBatch
384+
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
380385
batch.addRecord(rec)
381386
}
382387

388+
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
389+
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
390+
}
391+
392+
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
393+
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
394+
}
395+
383396
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
384397
frb := r.getOrCreateBlock(topic, partition)
385398
if len(frb.RecordsSet) == 0 {

message.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"time"
66
)
77

8-
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
9-
type CompressionCodec int8
10-
118
// The lowest 3 bits contain the compression codec used for the message
129
const compressionCodecMask int8 = 0x07
1310

11+
// Bit 3 set for "LogAppend" timestamps
12+
const timestampTypeMask = 0x08
13+
14+
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
15+
type CompressionCodec int8
16+
1417
const (
1518
CompressionNone CompressionCodec = 0
1619
CompressionGZIP CompressionCodec = 1
@@ -19,8 +22,6 @@ const (
1922
CompressionZSTD CompressionCodec = 4
2023
)
2124

22-
const timestampTypeMask = 0x08
23-
2425
func (cc CompressionCodec) String() string {
2526
return []string{
2627
"none",
@@ -55,6 +56,9 @@ func (m *Message) encode(pe packetEncoder) error {
5556
pe.putInt8(m.Version)
5657

5758
attributes := int8(m.Codec) & compressionCodecMask
59+
if m.LogAppendTime {
60+
attributes |= timestampTypeMask
61+
}
5862
pe.putInt8(attributes)
5963

6064
if m.Version >= 1 {

record_batch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 {
202202
if b.Control {
203203
attr |= controlMask
204204
}
205+
if b.LogAppendTime {
206+
attr |= timestampTypeMask
207+
}
205208
return attr
206209
}
207210

0 commit comments

Comments
 (0)