File tree Expand file tree Collapse file tree 2 files changed +7
-0
lines changed Expand file tree Collapse file tree 2 files changed +7
-0
lines changed Original file line number Diff line number Diff line change @@ -164,9 +164,13 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
164
164
rb := set .recordsToSend .RecordBatch
165
165
if len (rb .Records ) > 0 {
166
166
rb .LastOffsetDelta = int32 (len (rb .Records ) - 1 )
167
+ var maxTimestampDelta time.Duration
167
168
for i , record := range rb .Records {
168
169
record .OffsetDelta = int64 (i )
170
+ maxTimestampDelta = max (maxTimestampDelta , record .TimestampDelta )
169
171
}
172
+ // Also set the MaxTimestamp similar to the
173
+ rb .MaxTimestamp = rb .FirstTimestamp .Add (maxTimestampDelta )
170
174
}
171
175
172
176
// Set the batch as transactional when a transactionalID is set
Original file line number Diff line number Diff line change @@ -260,6 +260,9 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
260
260
if ! batch .FirstTimestamp .Equal (now .Truncate (time .Millisecond )) {
261
261
t .Errorf ("Wrong first timestamp: %v" , batch .FirstTimestamp )
262
262
}
263
+ if ! batch .MaxTimestamp .Equal (now .Add (9 * time .Second ).Truncate (time .Millisecond )) {
264
+ t .Errorf ("Wrong max timestamp: %v" , batch .MaxTimestamp )
265
+ }
263
266
for i := 0 ; i < 10 ; i ++ {
264
267
rec := batch .Records [i ]
265
268
if rec .TimestampDelta != time .Duration (i )* time .Second {
You can’t perform that action at this time.
0 commit comments