Skip to content

KAFKA-19110: Add missing unit test for Streams-consumer integration #19457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableList;

/**
* Tracks the state of a single member in relationship to a group:
* <p/>
Expand Down Expand Up @@ -1305,4 +1307,9 @@ public void onAllTasksLostCallbackCompleted(final StreamsOnAllTasksLostCallbackC
future.complete(null);
}
}

// visible for testing
List<MemberStateListener> stateListeners() {
return unmodifiableList(stateUpdatesListeners);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -205,6 +206,13 @@ private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
}

private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
return newConsumerWithStreamRebalanceData(props, null);
}

private AsyncKafkaConsumer<String, String> newConsumerWithStreamRebalanceData(
Properties props,
StreamsRebalanceData streamsRebalanceData
) {
// disable auto-commit by default, so we don't need to handle SyncCommitEvent for each case
if (!props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Expand All @@ -220,7 +228,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
(a, b, c, d, e, f, g) -> fetchCollector,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is pre-existing, but I'm wondering if it's worth doing a follow-up PR to use more meaningful names than a, b etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will handle this in a follow-up PR. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up PR: #19550

(a, b, c, d) -> metadata,
backgroundEventQueue,
Optional.empty()
Optional.ofNullable(streamsRebalanceData)
);
}

Expand Down Expand Up @@ -1371,6 +1379,51 @@ public void testGroupMetadataIsResetAfterUnsubscribe() {
assertEquals(groupMetadataAfterUnsubscribe, consumer.groupMetadata());
}

private Optional<StreamsRebalanceData> captureStreamRebalanceData(final MockedStatic<RequestManagers> requestManagers) {
ArgumentCaptor<Optional<StreamsRebalanceData>> streamRebalanceData = ArgumentCaptor.forClass(Optional.class);
requestManagers.verify(() -> RequestManagers.supplier(
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
any(),
streamRebalanceData.capture()
));
return streamRebalanceData.getValue();
}

@Test
public void testEmptyStreamRebalanceData() {
final String groupId = "consumerGroupA";
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
consumer = newConsumer(requiredConsumerConfigAndGroupId(groupId));
final Optional<StreamsRebalanceData> groupMetadataUpdateListener = captureStreamRebalanceData(requestManagers);
assertTrue(groupMetadataUpdateListener.isEmpty());
}
}

@Test
public void testStreamRebalanceData() {
final String groupId = "consumerGroupA";
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
final Optional<StreamsRebalanceData> groupMetadataUpdateListener = captureStreamRebalanceData(requestManagers);
assertTrue(groupMetadataUpdateListener.isPresent());
assertEquals(streamsRebalanceData, groupMetadataUpdateListener.get());
}
}

/**
* Tests that the consumer correctly invokes the callbacks for {@link ConsumerRebalanceListener} that was
* specified. We don't go through the full effort to emulate heartbeats and correct group management here. We're
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@

import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;

import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -65,8 +68,53 @@ public void testMemberStateListenerRegistered() {
listener,
Optional.empty()
).get();
requestManagers.consumerMembershipManager.ifPresent(
membershipManager -> assertTrue(membershipManager.stateListeners().contains(listener))
assertTrue(requestManagers.consumerMembershipManager.isPresent());
assertTrue(requestManagers.streamsMembershipManager.isEmpty());
assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isEmpty());

assertEquals(2, requestManagers.consumerMembershipManager.get().stateListeners().size());
assertTrue(requestManagers.consumerMembershipManager.get().stateListeners().stream()
.anyMatch(m -> m instanceof CommitRequestManager));
assertTrue(requestManagers.consumerMembershipManager.get().stateListeners().contains(listener));
}

@Test
public void testStreamMemberStateListenerRegistered() {

final MemberStateListener listener = (memberEpoch, memberId) -> { };

final Properties properties = requiredConsumerConfig();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup");
final ConsumerConfig config = new ConsumerConfig(properties);
final GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
GroupRebalanceConfig.ProtocolType.CONSUMER
);
final RequestManagers requestManagers = RequestManagers.supplier(
new MockTime(),
new LogContext(),
mock(BackgroundEventHandler.class),
mock(ConsumerMetadata.class),
mock(SubscriptionState.class),
mock(FetchBuffer.class),
config,
groupRebalanceConfig,
mock(ApiVersions.class),
mock(FetchMetricsManager.class),
() -> mock(NetworkClientDelegate.class),
Optional.empty(),
new Metrics(),
mock(OffsetCommitCallbackInvoker.class),
listener,
Optional.of(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()))
).get();
assertTrue(requestManagers.streamsMembershipManager.isPresent());
assertTrue(requestManagers.streamsGroupHeartbeatRequestManager.isPresent());
assertTrue(requestManagers.consumerMembershipManager.isEmpty());

assertEquals(2, requestManagers.streamsMembershipManager.get().stateListeners().size());
assertTrue(requestManagers.streamsMembershipManager.get().stateListeners().stream()
.anyMatch(m -> m instanceof CommitRequestManager));
assertTrue(requestManagers.streamsMembershipManager.get().stateListeners().contains(listener));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,36 @@ public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long remainingMs
}
}

@Test
public void testResetPollTimer() {
try (final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(Timer.class)) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer = pollTimerMockedConstruction.constructed().get(1);

heartbeatRequestManager.resetPollTimer(time.milliseconds());
verify(pollTimer).update(time.milliseconds());
verify(pollTimer).isExpired();
verify(pollTimer).reset(DEFAULT_MAX_POLL_INTERVAL_MS);
}
}

@Test
public void testResetPollTimerWhenExpired() {
try (final MockedConstruction<Timer> pollTimerMockedConstruction = mockConstruction(Timer.class)) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer = pollTimerMockedConstruction.constructed().get(1);

when(pollTimer.isExpired()).thenReturn(true);
heartbeatRequestManager.resetPollTimer(time.milliseconds());
verify(pollTimer).update(time.milliseconds());
verify(pollTimer).isExpired();
verify(pollTimer).isExpiredBy();
verify(membershipManager).memberId();
verify(membershipManager).maybeRejoinStaleMember();
verify(pollTimer).reset(DEFAULT_MAX_POLL_INTERVAL_MS);
}
}

private static ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -86,6 +90,8 @@ public class ApplicationEventProcessorTest {
private final OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class);
private SubscriptionState subscriptionState = mock(SubscriptionState.class);
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = mock(StreamsGroupHeartbeatRequestManager.class);
private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class);
private ApplicationEventProcessor processor;

private void setupProcessor(boolean withGroupId) {
Expand All @@ -109,6 +115,27 @@ private void setupProcessor(boolean withGroupId) {
);
}

private void setupStreamProcessor(boolean withGroupId) {
RequestManagers requestManagers = new RequestManagers(
new LogContext(),
offsetsRequestManager,
mock(TopicMetadataRequestManager.class),
mock(FetchRequestManager.class),
withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
withGroupId ? Optional.of(commitRequestManager) : Optional.empty(),
withGroupId ? Optional.of(heartbeatRequestManager) : Optional.empty(),
Optional.empty(),
withGroupId ? Optional.of(streamsGroupHeartbeatRequestManager) : Optional.empty(),
withGroupId ? Optional.of(streamsMembershipManager) : Optional.empty()
);
processor = new ApplicationEventProcessor(
new LogContext(),
requestManagers,
metadata,
subscriptionState
);
}

@Test
public void testPrepClosingCommitEvents() {
setupProcessor(true);
Expand Down Expand Up @@ -556,6 +583,78 @@ public void testAsyncCommitEventWithException() {
assertFutureThrows(IllegalStateException.class, event.future());
}

@Test
public void testStreamsOnTasksRevokedCallbackCompletedEvent() {
setupStreamProcessor(true);
StreamsOnTasksRevokedCallbackCompletedEvent event =
new StreamsOnTasksRevokedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
processor.process(event);
verify(streamsMembershipManager).onTasksRevokedCallbackCompleted(event);
}

@Test
public void testStreamsOnTasksRevokedCallbackCompletedEventWithoutStreamsMembershipManager() {
setupStreamProcessor(false);
StreamsOnTasksRevokedCallbackCompletedEvent event =
new StreamsOnTasksRevokedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
try (final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister()) {
logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
processor.process(event);
assertTrue(logAppender.getMessages().stream().anyMatch(e ->
e.contains("An internal error occurred; the Streams membership manager was not present, so the notification " +
"of the onTasksRevoked callback execution could not be sent")));
verify(streamsMembershipManager, never()).onTasksRevokedCallbackCompleted(event);
}
}

@Test
public void testStreamsOnTasksAssignedCallbackCompletedEvent() {
setupStreamProcessor(true);
StreamsOnTasksAssignedCallbackCompletedEvent event =
new StreamsOnTasksAssignedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
processor.process(event);
verify(streamsMembershipManager).onTasksAssignedCallbackCompleted(event);
}

@Test
public void testStreamsOnTasksAssignedCallbackCompletedEventWithoutStreamsMembershipManager() {
setupStreamProcessor(false);
StreamsOnTasksAssignedCallbackCompletedEvent event =
new StreamsOnTasksAssignedCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
try (final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister()) {
logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
processor.process(event);
assertTrue(logAppender.getMessages().stream().anyMatch(e ->
e.contains("An internal error occurred; the Streams membership manager was not present, so the notification " +
"of the onTasksAssigned callback execution could not be sent")));
verify(streamsMembershipManager, never()).onTasksAssignedCallbackCompleted(event);
}
}

@Test
public void testStreamsOnAllTasksLostCallbackCompletedEvent() {
setupStreamProcessor(true);
StreamsOnAllTasksLostCallbackCompletedEvent event =
new StreamsOnAllTasksLostCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
processor.process(event);
verify(streamsMembershipManager).onAllTasksLostCallbackCompleted(event);
}

@Test
public void testStreamsOnAllTasksLostCallbackCompletedEventWithoutStreamsMembershipManager() {
setupStreamProcessor(false);
StreamsOnAllTasksLostCallbackCompletedEvent event =
new StreamsOnAllTasksLostCallbackCompletedEvent(new CompletableFuture<>(), Optional.empty());
try (final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister()) {
logAppender.setClassLogger(ApplicationEventProcessor.class, Level.WARN);
processor.process(event);
assertTrue(logAppender.getMessages().stream().anyMatch(e ->
e.contains("An internal error occurred; the Streams membership manager was not present, so the notification " +
"of the onAllTasksLost callback execution could not be sent")));
verify(streamsMembershipManager, never()).onAllTasksLostCallbackCompleted(event);
}
}

private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo
final Map<String, Object> consumerConfigs) {
if (config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) {
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time.");
throw new IllegalStateException("Named topologies and the STREAMS protocol cannot be used at the same time.");
}
log.info("Streams rebalance protocol enabled");

Expand Down Expand Up @@ -2022,4 +2022,8 @@ Consumer<byte[], byte[]> restoreConsumer() {
Admin adminClient() {
return adminClient;
}

Optional<StreamsRebalanceData> streamsRebalanceData() {
return streamsRebalanceData;
}
}
Loading