Skip to content

Commit e666501

Browse files
author
Sergey Zimin
committed
Tests for timestamps added
1 parent f0e5fa7 commit e666501

File tree

4 files changed

+129
-8
lines changed

4 files changed

+129
-8
lines changed

consumer_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,108 @@ func TestConsumerExpiryTicker(t *testing.T) {
985985
broker0.Close()
986986
}
987987

988+
func TestConsumerTimestamps(t *testing.T) {
989+
now := time.Now().Truncate(time.Millisecond)
990+
type testMessage struct {
991+
key Encoder
992+
value Encoder
993+
offset int64
994+
timestamp time.Time
995+
}
996+
for _, d := range []struct {
997+
kversion KafkaVersion
998+
logAppendTime bool
999+
messages []testMessage
1000+
expectedTimestamp []time.Time
1001+
}{
1002+
{V0_8_2_0, false, []testMessage{
1003+
{nil, testMsg, 1, now},
1004+
{nil, testMsg, 2, now},
1005+
}, []time.Time{{}, {}}},
1006+
{V0_10_2_1, false, []testMessage{
1007+
{nil, testMsg, 1, now.Add(time.Second)},
1008+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1009+
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1010+
{V0_10_2_1, true, []testMessage{
1011+
{nil, testMsg, 1, now.Add(time.Second)},
1012+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1013+
}, []time.Time{now, now}},
1014+
{V0_11_0_0, false, []testMessage{
1015+
{nil, testMsg, 1, now.Add(time.Second)},
1016+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1017+
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1018+
{V0_11_0_0, true, []testMessage{
1019+
{nil, testMsg, 1, now.Add(time.Second)},
1020+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1021+
}, []time.Time{now, now}},
1022+
} {
1023+
var fr *FetchResponse
1024+
var offsetResponseVersion int16
1025+
cfg := NewConfig()
1026+
cfg.Version = d.kversion
1027+
switch d.kversion {
1028+
default:
1029+
fr = &FetchResponse{}
1030+
for _, m := range d.messages {
1031+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
1032+
}
1033+
case V0_10_2_1:
1034+
offsetResponseVersion = 1
1035+
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
1036+
for _, m := range d.messages {
1037+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
1038+
}
1039+
case V0_11_0_0:
1040+
offsetResponseVersion = 1
1041+
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
1042+
for _, m := range d.messages {
1043+
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
1044+
}
1045+
fr.SetLastOffsetDelta("my_topic", 0, 2)
1046+
fr.SetLastStableOffset("my_topic", 0, 2)
1047+
}
1048+
1049+
broker0 := NewMockBroker(t, 0)
1050+
broker0.SetHandlerByMap(map[string]MockResponse{
1051+
"MetadataRequest": NewMockMetadataResponse(t).
1052+
SetBroker(broker0.Addr(), broker0.BrokerID()).
1053+
SetLeader("my_topic", 0, broker0.BrokerID()),
1054+
"OffsetRequest": NewMockOffsetResponse(t).
1055+
SetVersion(offsetResponseVersion).
1056+
SetOffset("my_topic", 0, OffsetNewest, 1234).
1057+
SetOffset("my_topic", 0, OffsetOldest, 0),
1058+
"FetchRequest": NewMockSequence(fr),
1059+
})
1060+
1061+
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
1062+
if err != nil {
1063+
t.Fatal(err)
1064+
}
1065+
1066+
consumer, err := master.ConsumePartition("my_topic", 0, 1)
1067+
if err != nil {
1068+
t.Fatal(err)
1069+
}
1070+
1071+
for i, ts := range d.expectedTimestamp {
1072+
select {
1073+
case msg := <-consumer.Messages():
1074+
assertMessageOffset(t, msg, int64(i)+1)
1075+
if msg.Timestamp != ts {
1076+
t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
1077+
d.kversion, d.logAppendTime, msg.Timestamp, ts)
1078+
}
1079+
case err := <-consumer.Errors():
1080+
t.Fatal(err)
1081+
}
1082+
}
1083+
1084+
safeClose(t, consumer)
1085+
safeClose(t, master)
1086+
broker0.Close()
1087+
}
1088+
}
1089+
9881090
func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
9891091
if msg.Offset != expectedOffset {
9901092
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

fetch_response.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
186186
}
187187

188188
type FetchResponse struct {
189-
Blocks map[string]map[int32]*FetchResponseBlock
190-
ThrottleTime time.Duration
191-
Version int16 // v1 requires 0.9+, v2 requires 0.10+
189+
Blocks map[string]map[int32]*FetchResponseBlock
190+
ThrottleTime time.Duration
191+
Version int16 // v1 requires 0.9+, v2 requires 0.10+
192+
LogAppendTime bool
193+
Timestamp time.Time
192194
}
193195

194196
func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) {
355357
return kb, vb
356358
}
357359

358-
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
360+
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
359361
frb := r.getOrCreateBlock(topic, partition)
360362
kb, vb := encodeKV(key, value)
361-
msg := &Message{Key: kb, Value: vb}
363+
if r.LogAppendTime {
364+
timestamp = r.Timestamp
365+
}
366+
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
362367
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
363368
if len(frb.RecordsSet) == 0 {
364369
records := newLegacyRecords(&MessageSet{})
@@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
368373
set.Messages = append(set.Messages, msgBlock)
369374
}
370375

371-
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
376+
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
372377
frb := r.getOrCreateBlock(topic, partition)
373378
kb, vb := encodeKV(key, value)
374-
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
375379
if len(frb.RecordsSet) == 0 {
376-
records := newDefaultRecords(&RecordBatch{Version: 2})
380+
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
377381
frb.RecordsSet = []*Records{&records}
378382
}
379383
batch := frb.RecordsSet[0].RecordBatch
384+
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
380385
batch.addRecord(rec)
381386
}
382387

388+
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
389+
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
390+
}
391+
392+
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
393+
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
394+
}
395+
383396
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
384397
frb := r.getOrCreateBlock(topic, partition)
385398
if len(frb.RecordsSet) == 0 {

message.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ func (m *Message) encode(pe packetEncoder) error {
5555
pe.putInt8(m.Version)
5656

5757
attributes := int8(m.Codec) & compressionCodecMask
58+
if m.LogAppendTime {
59+
attributes |= timestampTypeMask
60+
}
5861
pe.putInt8(attributes)
5962

6063
if m.Version >= 1 {

record_batch.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 {
202202
if b.Control {
203203
attr |= controlMask
204204
}
205+
if b.LogAppendTime {
206+
attr |= timestampTypeMask
207+
}
205208
return attr
206209
}
207210

0 commit comments

Comments
 (0)