Skip to content

Commit a6c1f7e

Browse files
authored
Merge pull request #1258 from zimin2000/zimin2000.logappend.time
Support LogAppend timestamps
2 parents e775ee1 + e1eda41 commit a6c1f7e

File tree

5 files changed

+167
-13
lines changed

5 files changed

+167
-13
lines changed

consumer.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,9 +487,13 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
487487
for _, msgBlock := range msgSet.Messages {
488488
for _, msg := range msgBlock.Messages() {
489489
offset := msg.Offset
490+
timestamp := msg.Msg.Timestamp
490491
if msg.Msg.Version >= 1 {
491492
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
492493
offset += baseOffset
494+
if msg.Msg.LogAppendTime {
495+
timestamp = msgBlock.Msg.Timestamp
496+
}
493497
}
494498
if offset < child.offset {
495499
continue
@@ -500,7 +504,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
500504
Key: msg.Msg.Key,
501505
Value: msg.Msg.Value,
502506
Offset: offset,
503-
Timestamp: msg.Msg.Timestamp,
507+
Timestamp: timestamp,
504508
BlockTimestamp: msgBlock.Msg.Timestamp,
505509
})
506510
child.offset = offset + 1
@@ -519,13 +523,17 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
519523
if offset < child.offset {
520524
continue
521525
}
526+
timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
527+
if batch.LogAppendTime {
528+
timestamp = batch.MaxTimestamp
529+
}
522530
messages = append(messages, &ConsumerMessage{
523531
Topic: child.topic,
524532
Partition: child.partition,
525533
Key: rec.Key,
526534
Value: rec.Value,
527535
Offset: offset,
528-
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
536+
Timestamp: timestamp,
529537
Headers: rec.Headers,
530538
})
531539
child.offset = offset + 1
@@ -787,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
787795
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
788796
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
789797
}
798+
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
799+
request.Version = 1
800+
}
790801
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
791802
request.Version = 2
792803
}

consumer_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,123 @@ func TestConsumerExpiryTicker(t *testing.T) {
985985
broker0.Close()
986986
}
987987

988+
func TestConsumerTimestamps(t *testing.T) {
989+
now := time.Now().Truncate(time.Millisecond)
990+
type testMessage struct {
991+
key Encoder
992+
value Encoder
993+
offset int64
994+
timestamp time.Time
995+
}
996+
for _, d := range []struct {
997+
kversion KafkaVersion
998+
logAppendTime bool
999+
messages []testMessage
1000+
expectedTimestamp []time.Time
1001+
}{
1002+
{MinVersion, false, []testMessage{
1003+
{nil, testMsg, 1, now},
1004+
{nil, testMsg, 2, now},
1005+
}, []time.Time{{}, {}}},
1006+
{V0_9_0_0, false, []testMessage{
1007+
{nil, testMsg, 1, now},
1008+
{nil, testMsg, 2, now},
1009+
}, []time.Time{{}, {}}},
1010+
{V0_10_0_0, false, []testMessage{
1011+
{nil, testMsg, 1, now},
1012+
{nil, testMsg, 2, now},
1013+
}, []time.Time{{}, {}}},
1014+
{V0_10_2_1, false, []testMessage{
1015+
{nil, testMsg, 1, now.Add(time.Second)},
1016+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1017+
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1018+
{V0_10_2_1, true, []testMessage{
1019+
{nil, testMsg, 1, now.Add(time.Second)},
1020+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1021+
}, []time.Time{now, now}},
1022+
{V0_11_0_0, false, []testMessage{
1023+
{nil, testMsg, 1, now.Add(time.Second)},
1024+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1025+
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1026+
{V0_11_0_0, true, []testMessage{
1027+
{nil, testMsg, 1, now.Add(time.Second)},
1028+
{nil, testMsg, 2, now.Add(2 * time.Second)},
1029+
}, []time.Time{now, now}},
1030+
} {
1031+
var fr *FetchResponse
1032+
var offsetResponseVersion int16
1033+
cfg := NewConfig()
1034+
cfg.Version = d.kversion
1035+
switch {
1036+
case d.kversion.IsAtLeast(V0_11_0_0):
1037+
offsetResponseVersion = 1
1038+
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
1039+
for _, m := range d.messages {
1040+
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
1041+
}
1042+
fr.SetLastOffsetDelta("my_topic", 0, 2)
1043+
fr.SetLastStableOffset("my_topic", 0, 2)
1044+
case d.kversion.IsAtLeast(V0_10_1_0):
1045+
offsetResponseVersion = 1
1046+
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
1047+
for _, m := range d.messages {
1048+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
1049+
}
1050+
default:
1051+
var version int16
1052+
switch {
1053+
case d.kversion.IsAtLeast(V0_10_0_0):
1054+
version = 2
1055+
case d.kversion.IsAtLeast(V0_9_0_0):
1056+
version = 1
1057+
}
1058+
fr = &FetchResponse{Version: version}
1059+
for _, m := range d.messages {
1060+
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
1061+
}
1062+
}
1063+
1064+
broker0 := NewMockBroker(t, 0)
1065+
broker0.SetHandlerByMap(map[string]MockResponse{
1066+
"MetadataRequest": NewMockMetadataResponse(t).
1067+
SetBroker(broker0.Addr(), broker0.BrokerID()).
1068+
SetLeader("my_topic", 0, broker0.BrokerID()),
1069+
"OffsetRequest": NewMockOffsetResponse(t).
1070+
SetVersion(offsetResponseVersion).
1071+
SetOffset("my_topic", 0, OffsetNewest, 1234).
1072+
SetOffset("my_topic", 0, OffsetOldest, 0),
1073+
"FetchRequest": NewMockSequence(fr),
1074+
})
1075+
1076+
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
1077+
if err != nil {
1078+
t.Fatal(err)
1079+
}
1080+
1081+
consumer, err := master.ConsumePartition("my_topic", 0, 1)
1082+
if err != nil {
1083+
t.Fatal(err)
1084+
}
1085+
1086+
for i, ts := range d.expectedTimestamp {
1087+
select {
1088+
case msg := <-consumer.Messages():
1089+
assertMessageOffset(t, msg, int64(i)+1)
1090+
if msg.Timestamp != ts {
1091+
t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
1092+
d.kversion, d.logAppendTime, msg.Timestamp, ts)
1093+
}
1094+
case err := <-consumer.Errors():
1095+
t.Fatal(err)
1096+
}
1097+
}
1098+
1099+
safeClose(t, consumer)
1100+
safeClose(t, master)
1101+
broker0.Close()
1102+
}
1103+
}
1104+
9881105
func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
9891106
if msg.Offset != expectedOffset {
9901107
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

fetch_response.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
186186
}
187187

188188
type FetchResponse struct {
189-
Blocks map[string]map[int32]*FetchResponseBlock
190-
ThrottleTime time.Duration
191-
Version int16 // v1 requires 0.9+, v2 requires 0.10+
189+
Blocks map[string]map[int32]*FetchResponseBlock
190+
ThrottleTime time.Duration
191+
Version int16 // v1 requires 0.9+, v2 requires 0.10+
192+
LogAppendTime bool
193+
Timestamp time.Time
192194
}
193195

194196
func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) {
355357
return kb, vb
356358
}
357359

358-
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
360+
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
359361
frb := r.getOrCreateBlock(topic, partition)
360362
kb, vb := encodeKV(key, value)
361-
msg := &Message{Key: kb, Value: vb}
363+
if r.LogAppendTime {
364+
timestamp = r.Timestamp
365+
}
366+
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
362367
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
363368
if len(frb.RecordsSet) == 0 {
364369
records := newLegacyRecords(&MessageSet{})
@@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
368373
set.Messages = append(set.Messages, msgBlock)
369374
}
370375

371-
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
376+
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
372377
frb := r.getOrCreateBlock(topic, partition)
373378
kb, vb := encodeKV(key, value)
374-
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
375379
if len(frb.RecordsSet) == 0 {
376-
records := newDefaultRecords(&RecordBatch{Version: 2})
380+
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
377381
frb.RecordsSet = []*Records{&records}
378382
}
379383
batch := frb.RecordsSet[0].RecordBatch
384+
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
380385
batch.addRecord(rec)
381386
}
382387

388+
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
389+
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
390+
}
391+
392+
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
393+
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
394+
}
395+
383396
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
384397
frb := r.getOrCreateBlock(topic, partition)
385398
if len(frb.RecordsSet) == 0 {

message.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"time"
66
)
77

8-
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
9-
type CompressionCodec int8
10-
118
// The lowest 3 bits contain the compression codec used for the message
129
const compressionCodecMask int8 = 0x07
1310

11+
// Bit 3 set for "LogAppend" timestamps
12+
const timestampTypeMask = 0x08
13+
14+
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
15+
type CompressionCodec int8
16+
1417
const (
1518
CompressionNone CompressionCodec = 0
1619
CompressionGZIP CompressionCodec = 1
@@ -36,6 +39,7 @@ const CompressionLevelDefault = -1000
3639
type Message struct {
3740
Codec CompressionCodec // codec used to compress the message contents
3841
CompressionLevel int // compression level
42+
LogAppendTime bool // the used timestamp is LogAppendTime
3943
Key []byte // the message key, may be nil
4044
Value []byte // the message contents
4145
Set *MessageSet // the message set a message might wrap
@@ -52,6 +56,9 @@ func (m *Message) encode(pe packetEncoder) error {
5256
pe.putInt8(m.Version)
5357

5458
attributes := int8(m.Codec) & compressionCodecMask
59+
if m.LogAppendTime {
60+
attributes |= timestampTypeMask
61+
}
5562
pe.putInt8(attributes)
5663

5764
if m.Version >= 1 {
@@ -108,6 +115,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
108115
return err
109116
}
110117
m.Codec = CompressionCodec(attribute & compressionCodecMask)
118+
m.LogAppendTime = attribute&timestampTypeMask == timestampTypeMask
111119

112120
if m.Version == 1 {
113121
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {

record_batch.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type RecordBatch struct {
3636
Codec CompressionCodec
3737
CompressionLevel int
3838
Control bool
39+
LogAppendTime bool
3940
LastOffsetDelta int32
4041
FirstTimestamp time.Time
4142
MaxTimestamp time.Time
@@ -120,6 +121,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
120121
}
121122
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
122123
b.Control = attributes&controlMask == controlMask
124+
b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
123125

124126
if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
125127
return err
@@ -200,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 {
200202
if b.Control {
201203
attr |= controlMask
202204
}
205+
if b.LogAppendTime {
206+
attr |= timestampTypeMask
207+
}
203208
return attr
204209
}
205210

0 commit comments

Comments
 (0)