Skip to content

Commit 3fae785

Browse files
KAFKA-19110: Add missing unit test for Streams-consumer integration (#19457)
- Construct `AsyncKafkaConsumer` constructor and verify that the `RequestManagers.supplier()` contains Streams-specific data structures. - Verify that `RequestManagers` constructs the Streams request managers correctly - Test `StreamsGroupHeartbeatManager#resetPollTimer()` - Test `StreamsOnTasksRevokedCallbackCompletedEvent`, `StreamsOnTasksAssignedCallbackCompletedEvent`, and `StreamsOnAllTasksLostCallbackCompletedEvent` in `ApplicationEventProcessor` - Test `DefaultStreamsRebalanceListener` - Test `StreamThread`. - Test `handleStreamsRebalanceData`. - Test `StreamsRebalanceData`. Reviewers: Lucas Brutschy <[email protected]>, Bill Bejeck <[email protected]> Signed-off-by: PoAn Yang <[email protected]>
1 parent 8b4560e commit 3fae785

File tree

8 files changed

+786
-5
lines changed

8 files changed

+786
-5
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import java.util.stream.Collectors;
5353
import java.util.stream.Stream;
5454

55+
import static java.util.Collections.unmodifiableList;
56+
5557
/**
5658
* Tracks the state of a single member in relationship to a group:
5759
* <p/>
@@ -1305,4 +1307,9 @@ public void onAllTasksLostCallbackCompleted(final StreamsOnAllTasksLostCallbackC
13051307
future.complete(null);
13061308
}
13071309
}
1310+
1311+
// visible for testing
1312+
List<MemberStateListener> stateListeners() {
1313+
return unmodifiableList(stateUpdatesListeners);
1314+
}
13081315
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

+54-1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import java.util.Set;
118118
import java.util.SortedSet;
119119
import java.util.TreeSet;
120+
import java.util.UUID;
120121
import java.util.concurrent.CompletableFuture;
121122
import java.util.concurrent.CountDownLatch;
122123
import java.util.concurrent.Future;
@@ -205,6 +206,13 @@ private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
205206
}
206207

207208
private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
209+
return newConsumerWithStreamRebalanceData(props, null);
210+
}
211+
212+
private AsyncKafkaConsumer<String, String> newConsumerWithStreamRebalanceData(
213+
Properties props,
214+
StreamsRebalanceData streamsRebalanceData
215+
) {
208216
// disable auto-commit by default, so we don't need to handle SyncCommitEvent for each case
209217
if (!props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
210218
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -220,7 +228,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
220228
(a, b, c, d, e, f, g) -> fetchCollector,
221229
(a, b, c, d) -> metadata,
222230
backgroundEventQueue,
223-
Optional.empty()
231+
Optional.ofNullable(streamsRebalanceData)
224232
);
225233
}
226234

@@ -1371,6 +1379,51 @@ public void testGroupMetadataIsResetAfterUnsubscribe() {
13711379
assertEquals(groupMetadataAfterUnsubscribe, consumer.groupMetadata());
13721380
}
13731381

1382+
private Optional<StreamsRebalanceData> captureStreamRebalanceData(final MockedStatic<RequestManagers> requestManagers) {
1383+
ArgumentCaptor<Optional<StreamsRebalanceData>> streamRebalanceData = ArgumentCaptor.forClass(Optional.class);
1384+
requestManagers.verify(() -> RequestManagers.supplier(
1385+
any(),
1386+
any(),
1387+
any(),
1388+
any(),
1389+
any(),
1390+
any(),
1391+
any(),
1392+
any(),
1393+
any(),
1394+
any(),
1395+
any(),
1396+
any(),
1397+
any(),
1398+
any(),
1399+
any(),
1400+
streamRebalanceData.capture()
1401+
));
1402+
return streamRebalanceData.getValue();
1403+
}
1404+
1405+
@Test
1406+
public void testEmptyStreamRebalanceData() {
1407+
final String groupId = "consumerGroupA";
1408+
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
1409+
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
1410+
final Optional<StreamsRebalanceData> groupMetadataUpdateListener = captureStreamRebalanceData(requestManagers);
1411+
assertTrue(groupMetadataUpdateListener.isEmpty());
1412+
}
1413+
}
1414+
1415+
@Test
1416+
public void testStreamRebalanceData() {
1417+
final String groupId = "consumerGroupA";
1418+
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
1419+
StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
1420+
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
1421+
final Optional<StreamsRebalanceData> groupMetadataUpdateListener = captureStreamRebalanceData(requestManagers);
1422+
assertTrue(groupMetadataUpdateListener.isPresent());
1423+
assertEquals(streamsRebalanceData, groupMetadataUpdateListener.get());
1424+
}
1425+
}
1426+
13741427
/**
13751428
* Tests that the consumer correctly invokes the callbacks for {@link ConsumerRebalanceListener} that was
13761429
* specified. We don't go through the full effort to emulate heartbeats and correct group management here. We're

clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java

+51-3
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626

2727
import org.junit.jupiter.api.Test;
2828

29+
import java.util.Map;
2930
import java.util.Optional;
3031
import java.util.Properties;
32+
import java.util.UUID;
3133

3234
import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
35+
import static org.junit.jupiter.api.Assertions.assertEquals;
3336
import static org.junit.jupiter.api.Assertions.assertTrue;
3437
import static org.mockito.Mockito.mock;
3538

@@ -65,8 +68,53 @@ public void testMemberStateListenerRegistered() {
6568
listener,
6669
Optional.empty()
6770
).get();
68-
requestManagers.consumerMembershipManager.ifPresent(
69-
membershipManager -> assertTrue(membershipManager.stateListeners().contains(listener))
71+
assertTrue(requestManagers.consumerMembershipManager.isPresent());
72+
assertTrue(requestManagers.streamsMembershipManager.isEmpty());
73+
assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isEmpty());
74+
75+
assertEquals(2, requestManagers.consumerMembershipManager.get().stateListeners().size());
76+
assertTrue(requestManagers.consumerMembershipManager.get().stateListeners().stream()
77+
.anyMatch(m -> m instanceof CommitRequestManager));
78+
assertTrue(requestManagers.consumerMembershipManager.get().stateListeners().contains(listener));
79+
}
80+
81+
@Test
82+
public void testStreamMemberStateListenerRegistered() {
83+
84+
final MemberStateListener listener = (memberEpoch, memberId) -> { };
85+
86+
final Properties properties = requiredConsumerConfig();
87+
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup");
88+
final ConsumerConfig config = new ConsumerConfig(properties);
89+
final GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
90+
config,
91+
GroupRebalanceConfig.ProtocolType.CONSUMER
7092
);
93+
final RequestManagers requestManagers = RequestManagers.supplier(
94+
new MockTime(),
95+
new LogContext(),
96+
mock(BackgroundEventHandler.class),
97+
mock(ConsumerMetadata.class),
98+
mock(SubscriptionState.class),
99+
mock(FetchBuffer.class),
100+
config,
101+
groupRebalanceConfig,
102+
mock(ApiVersions.class),
103+
mock(FetchMetricsManager.class),
104+
() -> mock(NetworkClientDelegate.class),
105+
Optional.empty(),
106+
new Metrics(),
107+
mock(OffsetCommitCallbackInvoker.class),
108+
listener,
109+
Optional.of(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()))
110+
).get();
111+
assertTrue(requestManagers.streamsMembershipManager.isPresent());
112+
assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isPresent());
113+
assertTrue(requestManagers.consumerMembershipManager.isEmpty());
114+
115+
assertEquals(2, requestManagers.streamsMembershipManager.get().stateListeners().size());
116+
assertTrue(requestManagers.streamsMembershipManager.get().stateListeners().stream()
117+
.anyMatch(m -> m instanceof CommitRequestManager));
118+
assertTrue(requestManagers.streamsMembershipManager.get().stateListeners().contains(listener));
71119
}
72-
}
120+
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -1520,6 +1520,36 @@ public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long remainingMs
15201520
}
15211521
}
15221522

1523+
@Test
1524+
public void testResetPollTimer() {
1525+
try (final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(Timer.class)) {
1526+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1527+
final Timer pollTimer = pollTimerMockedConstruction.constructed().get(1);
1528+
1529+
heartbeatRequestManager.resetPollTimer(time.milliseconds());
1530+
verify(pollTimer).update(time.milliseconds());
1531+
verify(pollTimer).isExpired();
1532+
verify(pollTimer).reset(DEFAULT_MAX_POLL_INTERVAL_MS);
1533+
}
1534+
}
1535+
1536+
@Test
1537+
public void testResetPollTimerWhenExpired() {
1538+
try (final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(Timer.class)) {
1539+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1540+
final Timer pollTimer = pollTimerMockedConstruction.constructed().get(1);
1541+
1542+
when(pollTimer.isExpired()).thenReturn(true);
1543+
heartbeatRequestManager.resetPollTimer(time.milliseconds());
1544+
verify(pollTimer).update(time.milliseconds());
1545+
verify(pollTimer).isExpired();
1546+
verify(pollTimer).isExpiredBy();
1547+
verify(membershipManager).memberId();
1548+
verify(membershipManager).maybeRejoinStaleMember();
1549+
verify(pollTimer).reset(DEFAULT_MAX_POLL_INTERVAL_MS);
1550+
}
1551+
}
1552+
15231553
private static ConsumerConfig config() {
15241554
Properties prop = new Properties();
15251555
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java

+99
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,19 @@
3131
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
3232
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
3333
import org.apache.kafka.clients.consumer.internals.RequestManagers;
34+
import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
35+
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
3436
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
3537
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
3638
import org.apache.kafka.common.Cluster;
3739
import org.apache.kafka.common.KafkaException;
3840
import org.apache.kafka.common.TopicPartition;
41+
import org.apache.kafka.common.utils.LogCaptureAppender;
3942
import org.apache.kafka.common.utils.LogContext;
4043
import org.apache.kafka.common.utils.MockTime;
4144
import org.apache.kafka.common.utils.Time;
4245

46+
import org.apache.logging.log4j.Level;
4347
import org.junit.jupiter.api.Test;
4448
import org.junit.jupiter.params.ParameterizedTest;
4549
import org.junit.jupiter.params.provider.Arguments;
@@ -86,6 +90,8 @@ public class ApplicationEventProcessorTest {
8690
private final OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class);
8791
private SubscriptionState subscriptionState = mock(SubscriptionState.class);
8892
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
93+
private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = mock(StreamsGroupHeartbeatRequestManager.class);
94+
private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class);
8995
private ApplicationEventProcessor processor;
9096

9197
private void setupProcessor(boolean withGroupId) {
@@ -109,6 +115,27 @@ private void setupProcessor(boolean withGroupId) {
109115
);
110116
}
111117

118+
private void setupStreamProcessor(boolean withGroupId) {
119+
RequestManagers requestManagers = new RequestManagers(
120+
new LogContext(),
121+
offsetsRequestManager,
122+
mock(TopicMetadataRequestManager.class),
123+
mock(FetchRequestManager.class),
124+
withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
125+
withGroupId ? Optional.of(commitRequestManager) : Optional.empty(),
126+
withGroupId ? Optional.of(heartbeatRequestManager) : Optional.empty(),
127+
Optional.empty(),
128+
withGroupId ? Optional.of(streamsGroupHeartbeatRequestManager) : Optional.empty(),
129+
withGroupId ? Optional.of(streamsMembershipManager) : Optional.empty()
130+
);
131+
processor = new ApplicationEventProcessor(
132+
new LogContext(),
133+
requestManagers,
134+
metadata,
135+
subscriptionState
136+
);
137+
}
138+
112139
@Test
113140
public void testPrepClosingCommitEvents() {
114141
setupProcessor(true);
@@ -556,6 +583,78 @@ public void testAsyncCommitEventWithException() {
556583
assertFutureThrows(IllegalStateException.class, event.future());
557584
}
558585

586+
@Test
587+
public void testStreamsOnTasksRevokedCallbackCompletedEvent() {
588+
setupStreamProcessor(true);
589+
StreamsOnTasksRevokedCallbackCompletedEvent event =
590+
new StreamsOnTasksRevokedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
591+
processor.process(event);
592+
verify(streamsMembershipManager).onTasksRevokedCallbackCompleted(event);
593+
}
594+
595+
@Test
596+
public void testStreamsOnTasksRevokedCallbackCompletedEventWithoutStreamsMembershipManager() {
597+
setupStreamProcessor(false);
598+
StreamsOnTasksRevokedCallbackCompletedEvent event =
599+
new StreamsOnTasksRevokedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
600+
try (final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister()) {
601+
logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
602+
processor.process(event);
603+
assertTrue(logAppender.getMessages().stream().anyMatch(e ->
604+
e.contains("An internal error occurred; the Streams membership manager was not present, so the notification " +
605+
"of the onTasksRevoked callback execution could not be sent")));
606+
verify(streamsMembershipManager, never()).onTasksRevokedCallbackCompleted(event);
607+
}
608+
}
609+
610+
@Test
611+
public void testStreamsOnTasksAssignedCallbackCompletedEvent() {
612+
setupStreamProcessor(true);
613+
StreamsOnTasksAssignedCallbackCompletedEvent event =
614+
new StreamsOnTasksAssignedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
615+
processor.process(event);
616+
verify(streamsMembershipManager).onTasksAssignedCallbackCompleted(event);
617+
}
618+
619+
@Test
620+
public void testStreamsOnTasksAssignedCallbackCompletedEventWithoutStreamsMembershipManager() {
621+
setupStreamProcessor(false);
622+
StreamsOnTasksAssignedCallbackCompletedEvent event =
623+
new StreamsOnTasksAssignedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
624+
try (final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister()) {
625+
logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
626+
processor.process(event);
627+
assertTrue(logAppender.getMessages().stream().anyMatch(e ->
628+
e.contains("An internal error occurred; the Streams membership manager was not present, so the notification " +
629+
"of the onTasksAssigned callback execution could not be sent")));
630+
verify(streamsMembershipManager, never()).onTasksAssignedCallbackCompleted(event);
631+
}
632+
}
633+
634+
@Test
635+
public void testStreamsOnAllTasksLostCallbackCompletedEvent() {
636+
setupStreamProcessor(true);
637+
StreamsOnAllTasksLostCallbackCompletedEvent event =
638+
new StreamsOnAllTasksLostCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
639+
processor.process(event);
640+
verify(streamsMembershipManager).onAllTasksLostCallbackCompleted(event);
641+
}
642+
643+
@Test
644+
public void testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembershipManager() {
645+
setupStreamProcessor(false);
646+
StreamsOnAllTasksLostCallbackCompletedEvent event =
647+
new StreamsOnAllTasksLostCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
648+
try (final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister()) {
649+
logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
650+
processor.process(event);
651+
assertTrue(logAppender.getMessages().stream().anyMatch(e ->
652+
e.contains("An internal error occurred; the Streams membership manager was not present, so the notification " +
653+
"of the onAllTasksLost callback execution could not be sent")));
654+
verify(streamsMembershipManager, never()).onAllTasksLostCallbackCompleted(event);
655+
}
656+
}
657+
559658
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
560659
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
561660
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo
534534
final Map<String, Object> consumerConfigs) {
535535
if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) {
536536
if (topologyMetadata.hasNamedTopologies()) {
537-
throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time.");
537+
throw new IllegalStateException("Named topologies and the STREAMS protocol cannot be used at the same time.");
538538
}
539539
log.info("Streams rebalance protocol enabled");
540540

@@ -2022,4 +2022,8 @@ Consumer<byte[], byte[]> restoreConsumer() {
20222022
Admin adminClient() {
20232023
return adminClient;
20242024
}
2025+
2026+
Optional<StreamsRebalanceData> streamsRebalanceData() {
2027+
return streamsRebalanceData;
2028+
}
20252029
}

0 commit comments

Comments
 (0)