Skip to content

Commit 3c7ed33

Browse files
authored
KAFKA-18397: Added null check before sending background event from ShareConsumeRequestManager. (#18419)
Reviewers: Andrew Schofield <[email protected]>
1 parent 7436159 commit 3c7ed33

File tree

2 files changed

+94
-2
lines changed

2 files changed

+94
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1169,14 +1169,16 @@ class ResultHandler {
11691169
* signal the completion when all results are known.
11701170
*/
11711171
public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, boolean isCommitAsync) {
1172-
if (acknowledgements != null) {
1172+
if (!isCommitAsync && acknowledgements != null) {
11731173
result.put(partition, acknowledgements);
11741174
}
11751175
// For commitAsync, we do not wait for other results to complete, we prepare a background event
11761176
// for every ShareAcknowledgeResponse.
11771177
// For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time.
11781178
if (isCommitAsync) {
1179-
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements));
1179+
if (acknowledgements != null) {
1180+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements));
1181+
}
11801182
} else if (remainingResults != null && remainingResults.decrementAndGet() == 0) {
11811183
maybeSendShareAcknowledgeCommitCallbackEvent(result);
11821184
future.ifPresent(future -> future.complete(result));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

+90
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@
9696
import java.util.Optional;
9797
import java.util.Properties;
9898
import java.util.Set;
99+
import java.util.concurrent.CompletableFuture;
99100
import java.util.concurrent.ConcurrentLinkedQueue;
100101
import java.util.concurrent.LinkedBlockingQueue;
102+
import java.util.concurrent.atomic.AtomicInteger;
101103
import java.util.stream.Collectors;
102104
import java.util.stream.Stream;
103105

@@ -548,6 +550,89 @@ public void testAcknowledgeOnCloseWithPendingCommitSync() {
548550
completedAcknowledgements.clear();
549551
}
550552

553+
@Test
554+
public void testResultHandlerOnCommitAsync() {
555+
buildRequestManager();
556+
// Enabling the config so that background event is sent when the acknowledgement response is received.
557+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
558+
559+
Acknowledgements acknowledgements = Acknowledgements.empty();
560+
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
561+
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
562+
acknowledgements.add(3L, AcknowledgeType.REJECT);
563+
564+
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
565+
566+
// Passing null acknowledgements should mean we do not send the background event at all.
567+
resultHandler.complete(tip0, null, true);
568+
assertEquals(0, completedAcknowledgements.size());
569+
570+
// Setting isCommitAsync to false should still not send any background event
571+
// as we have initialized remainingResults to null.
572+
resultHandler.complete(tip0, acknowledgements, false);
573+
assertEquals(0, completedAcknowledgements.size());
574+
575+
// Sending non-null acknowledgements means we do send the background event
576+
resultHandler.complete(tip0, acknowledgements, true);
577+
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
578+
}
579+
580+
@Test
581+
public void testResultHandlerOnCommitSync() {
582+
buildRequestManager();
583+
// Enabling the config so that background event is sent when the acknowledgement response is received.
584+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
585+
586+
Acknowledgements acknowledgements = Acknowledgements.empty();
587+
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
588+
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
589+
acknowledgements.add(3L, AcknowledgeType.REJECT);
590+
591+
final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>();
592+
593+
// Initializing resultCount to 3.
594+
AtomicInteger resultCount = new AtomicInteger(3);
595+
596+
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
597+
598+
// We only send the background event after all results have been completed.
599+
resultHandler.complete(tip0, acknowledgements, false);
600+
assertEquals(0, completedAcknowledgements.size());
601+
assertFalse(future.isDone());
602+
603+
resultHandler.complete(t2ip0, null, false);
604+
assertEquals(0, completedAcknowledgements.size());
605+
assertFalse(future.isDone());
606+
607+
// After third response is received, we send the background event.
608+
resultHandler.complete(tip1, acknowledgements, false);
609+
assertEquals(1, completedAcknowledgements.size());
610+
assertEquals(2, completedAcknowledgements.get(0).size());
611+
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
612+
assertEquals(3, completedAcknowledgements.get(0).get(tip1).size());
613+
assertTrue(future.isDone());
614+
}
615+
616+
@Test
617+
public void testResultHandlerCompleteIfEmpty() {
618+
buildRequestManager();
619+
620+
final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>();
621+
622+
// Initializing resultCount to 1.
623+
AtomicInteger resultCount = new AtomicInteger(1);
624+
625+
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
626+
627+
resultHandler.completeIfEmpty();
628+
assertFalse(future.isDone());
629+
630+
resultCount.decrementAndGet();
631+
632+
resultHandler.completeIfEmpty();
633+
assertTrue(future.isDone());
634+
}
635+
551636
@Test
552637
public void testBatchingAcknowledgeRequestStates() {
553638
buildRequestManager();
@@ -1730,6 +1815,11 @@ private int sendAcknowledgements() {
17301815
return pollResult.unsentRequests.size();
17311816
}
17321817

1818+
public ResultHandler buildResultHandler(final AtomicInteger remainingResults,
1819+
final Optional<CompletableFuture<Map<TopicIdPartition, Acknowledgements>>> future) {
1820+
return new ResultHandler(remainingResults, future);
1821+
}
1822+
17331823
public Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
17341824
return super.requestStates(nodeId);
17351825
}

0 commit comments

Comments
 (0)