Skip to content

Commit eeb1214

Browse files
authored
KAFKA-18962: Fix onBatchRestored call in GlobalStateManagerImpl (#19188)
Call the StateRestoreListener#onBatchRestored with numRestored and not the totalRestored when reprocessing state See: https://issues.apache.org/jira/browse/KAFKA-18962 Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Matthias Sax <[email protected]>
1 parent fa62bce commit eeb1214

File tree

5 files changed

+89
-11
lines changed

5 files changed

+89
-11
lines changed

checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
106106

107107
<suppress checks="NPathComplexity"
108-
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
108+
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>
109109

110110
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
111111
files="CoordinatorClient.java"/>

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+33-6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Collections;
3737
import java.util.HashMap;
3838
import java.util.HashSet;
39+
import java.util.Iterator;
3940
import java.util.LinkedList;
4041
import java.util.List;
4142
import java.util.Map;
@@ -79,6 +80,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
7980
private Uuid clientInstanceId;
8081
private int injectTimeoutExceptionCounter;
8182

83+
private long maxPollRecords = Long.MAX_VALUE;
84+
8285
private final List<KafkaMetric> addedMetrics = new ArrayList<>();
8386

8487
/**
@@ -275,14 +278,22 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
275278
// update the consumed offset
276279
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
277280
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>();
278-
final List<TopicPartition> toClear = new ArrayList<>();
281+
long numPollRecords = 0L;
282+
283+
final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> partitionsIter = this.records.entrySet().iterator();
284+
while (partitionsIter.hasNext() && numPollRecords < this.maxPollRecords) {
285+
Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = partitionsIter.next();
279286

280-
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
281287
if (!subscriptions.isPaused(entry.getKey())) {
282-
final List<ConsumerRecord<K, V>> recs = entry.getValue();
283-
for (final ConsumerRecord<K, V> rec : recs) {
288+
final Iterator<ConsumerRecord<K, V>> recIterator = entry.getValue().iterator();
289+
while (recIterator.hasNext()) {
290+
if (numPollRecords >= this.maxPollRecords) {
291+
break;
292+
}
284293
long position = subscriptions.position(entry.getKey()).offset;
285294

295+
final ConsumerRecord<K, V> rec = recIterator.next();
296+
286297
if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > position) {
287298
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
288299
}
@@ -294,13 +305,17 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
294305
rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch);
295306
subscriptions.position(entry.getKey(), newPosition);
296307
nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
308+
numPollRecords++;
309+
recIterator.remove();
297310
}
298311
}
299-
toClear.add(entry.getKey());
312+
313+
if (entry.getValue().isEmpty()) {
314+
partitionsIter.remove();
315+
}
300316
}
301317
}
302318

303-
toClear.forEach(records::remove);
304319
return new ConsumerRecords<>(results, nextOffsetAndMetadata);
305320
}
306321

@@ -314,6 +329,18 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
314329
recs.add(record);
315330
}
316331

332+
/**
333+
* Sets the maximum number of records returned in a single call to {@link #poll(Duration)}.
334+
*
335+
* @param maxPollRecords the max.poll.records.
336+
*/
337+
public synchronized void setMaxPollRecords(long maxPollRecords) {
338+
if (this.maxPollRecords < 1) {
339+
throw new IllegalArgumentException("MaxPollRecords must be strictly superior to 0");
340+
}
341+
this.maxPollRecords = maxPollRecords;
342+
}
343+
317344
public synchronized void setPollException(KafkaException exception) {
318345
this.pollException = exception;
319346
}

clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Iterator;
3333
import java.util.List;
3434
import java.util.Optional;
35+
import java.util.stream.IntStream;
3536

3637
import static org.junit.jupiter.api.Assertions.assertEquals;
3738
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -202,4 +203,33 @@ public void testRe2JPatternSubscription() {
202203
assertThrows(IllegalStateException.class, () -> consumer.subscribe(List.of("topic1")));
203204
}
204205

206+
@Test
207+
public void shouldReturnMaxPollRecords() {
208+
TopicPartition partition = new TopicPartition("test", 0);
209+
consumer.assign(Collections.singleton(partition));
210+
consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
211+
212+
IntStream.range(0, 10).forEach(offset -> {
213+
consumer.addRecord(new ConsumerRecord<>("test", 0, offset, null, null));
214+
});
215+
216+
consumer.setMaxPollRecords(2L);
217+
218+
ConsumerRecords<String, String> records;
219+
220+
records = consumer.poll(Duration.ofMillis(1));
221+
assertEquals(2, records.count());
222+
223+
records = consumer.poll(Duration.ofMillis(1));
224+
assertEquals(2, records.count());
225+
226+
consumer.setMaxPollRecords(Long.MAX_VALUE);
227+
228+
records = consumer.poll(Duration.ofMillis(1));
229+
assertEquals(6, records.count());
230+
231+
records = consumer.poll(Duration.ofMillis(1));
232+
assertTrue(records.isEmpty());
233+
}
234+
205235
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
300300
currentDeadline = NO_DEADLINE;
301301
}
302302

303+
long batchRestoreCount = 0;
303304
for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) {
304305
final ProcessorRecordContext recordContext =
305306
new ProcessorRecordContext(
@@ -318,6 +319,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
318319
record.timestamp(),
319320
record.headers()));
320321
restoreCount++;
322+
batchRestoreCount++;
321323
}
322324
} catch (final Exception deserializationException) {
323325
// while Java distinguishes checked vs unchecked exceptions, other languages
@@ -341,7 +343,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
341343

342344
offset = getGlobalConsumerOffset(topicPartition);
343345

344-
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreCount);
346+
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, batchRestoreCount);
345347
}
346348
stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount);
347349
checkpointFileCache.put(topicPartition, offset);

streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -355,16 +355,35 @@ public void shouldRestoreRecordsUpToHighwatermark() {
355355
assertEquals(2, stateRestoreCallback.restored.size());
356356
}
357357

358+
@Test
359+
public void shouldListenForRestoreEventsWhenReprocessing() {
360+
setUpReprocessing();
361+
362+
initializeConsumer(6, 1, t1);
363+
consumer.setMaxPollRecords(2L);
364+
365+
stateManager.initialize();
366+
stateManager.registerStore(store1, stateRestoreCallback, null);
367+
368+
assertThat(stateRestoreListener.numBatchRestored, equalTo(2L));
369+
assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L));
370+
assertThat(stateRestoreListener.restoreEndOffset, equalTo(7L));
371+
assertThat(stateRestoreListener.totalNumRestored, equalTo(6L));
372+
}
373+
358374
@Test
359375
public void shouldListenForRestoreEvents() {
360-
initializeConsumer(5, 1, t1);
376+
initializeConsumer(6, 1, t1);
377+
consumer.setMaxPollRecords(2L);
378+
361379
stateManager.initialize();
362380

363381
stateManager.registerStore(store1, stateRestoreCallback, null);
364382

383+
assertThat(stateRestoreListener.numBatchRestored, equalTo(2L));
365384
assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L));
366-
assertThat(stateRestoreListener.restoreEndOffset, equalTo(6L));
367-
assertThat(stateRestoreListener.totalNumRestored, equalTo(5L));
385+
assertThat(stateRestoreListener.restoreEndOffset, equalTo(7L));
386+
assertThat(stateRestoreListener.totalNumRestored, equalTo(6L));
368387

369388

370389
assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(store1.name()));

0 commit comments

Comments
 (0)