Skip to content

KAFKA-17853: Fix termination issue in ConsoleConsumer and ConsoleShareConsumer #19886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from

Conversation

ShivsundarR
Copy link
Collaborator

@ShivsundarR ShivsundarR commented Jun 3, 2025

https://issues.apache.org/jira/browse/KAFKA-17853 -

  • There is an issue with the console share consumer where if the broker
    is unavailable, even after force terminating using ctrl-c, the consumer
    does not shut down immediately. It takes around ~30 seconds to close
    once the broker shuts down.

  • The console consumer on the other hand, was supposedly shutting down
    immediately once we press ctrl-c. On reproducing the issue with a local
    kafka server, I observed the issue was present in both the console
    consumer and the console share consumer.

Issue :

  • On seeing the client debug logs, this issue seemed related to network
    thread sending repeated FindCoordinator requests until the timer
    expired. This was happening in both the console-consumer and
    console-share-consumer.

  • Debug logs showed that when the broker is shut down, the heartbeat
    fails with a DisconnectException(which is retriable), this triggers a
    findCoordinator request on the network thread which retries until the
    default timeout expires.

  • This request is sent even before we trigger a close on the consumer,
    so once we press ctrl-c, although the ConsumerNetworkThread::close()
    is triggered, it waits for the default timeout until all the requests
    are sent out for a graceful shutdown.

PR aims to fix this issue by adding a check in NetworkClientDelegate
to remove any pending unsent requests(with empty node values) during
close. This would avoid unnecessary retries and the consumers would
shut down immediately upon termination.

Share consumers shutting down after the fix.

[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8,
node=Optional.empty, remainingMs=28565} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
FindCoordinator request failed due to retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:23:42,176] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closing
RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,177] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,179] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closed
the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:23:42,181] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Kafka
share consumer has been closed
(org.apache.kafka.clients.consumer.internals.ShareConsumerImpl)
Processed a total of 0 messages

Regular consumers shutting down after the fix.

[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b,
node=Optional.empty, remainingMs=29160} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] FindCoordinator request failed due to
retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closing RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing test-topic-23-0 from buffered
fetch data as it is not in the set of partitions to retain ([])
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closed the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Kafka consumer has been closed
(org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
Processed a total of 0 messages

@github-actions github-actions bot added the triage PRs from the community label Jun 3, 2025
@ShivsundarR ShivsundarR requested a review from lianetm June 3, 2025 10:55
@ShivsundarR ShivsundarR added ci-approved and removed triage PRs from the community labels Jun 3, 2025
@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label Jun 3, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I like the approach but want to review more deeply. A few comments to address.

@github-actions github-actions bot removed the small Small PRs label Jun 3, 2025
@github-actions github-actions bot added the small Small PRs label Jun 3, 2025
@apoorvmittal10
Copy link
Contributor

Great to see the fix in the area, thanks for looking into this.

@github-actions github-actions bot removed the small Small PRs label Jun 11, 2025
@ShivsundarR
Copy link
Collaborator Author

Another edge case here:
If the broker never started, and we attempt a ctrl-c to close the consumer, then still took 30 seconds for the ConsoleShareConsumer to close. The problem lied with the ShareConsumeRequestManager where we do not complete the closeFuture when the memberId is null.
Added a fix for the same and verified that both the ConsoleConsumer and ConsoleShareConsumer work consistently and close even if the broker never started.

@lianetm
Copy link
Member

lianetm commented Jun 11, 2025

Hey, thanks for looking into this! Sorry I haven't had the time to look in detail, but just couple of high level comments:

This means "close with default close timeout of 30s". So in a way, it's not unexpected that it waits right?. We should ensure that's what we want from the console consumers. Ex. calling close(ZERO) would be the way if we want to "close and attempt sending all requests once without retrying/waiting"

@ShivsundarR
Copy link
Collaborator Author

Hi @lianetm, thanks for the review.

  1. So here the abort of the FindCoordinator will only happen when the network thread closes, i.e after all the pending async commits/acknowledgements have completed. So this should not affect the findCoordinator request issued for an async commit right?
    https://github.com/apache/kafka/blob/7c715c02c06f16475faff8aa72048cddb7382c8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L887C9-L888C163
  2. I agree, we could go with close(Duration.ZERO) too to achieve immediate close on ctrl-c. We just need to decide if we want this functionality or not, seems like an immediate close on ctrl-C would be nice from a user experience perspective.
    I am just wondering if we make the ConsoleConsumer/ConsoleShareConsumer do a close(Duration.ZERO), then on certain cases where we actually need some time to send async commits/acknowledgements, we might force close the consumer sooner. This could be an issue right?
    The PR currently only modifies the code when the network thread is closed(i.e after all the commits/acks have been sent/handled), so this ideally should not affect the prior steps in closing.

@kirktrue
Copy link
Contributor

Thanks for the PR, @ShivsundarR.

Since this change affects all consumer use cases, we need to tread carefully 😄

The console consumer is closing the underlying consumer with the default close() API, which uses a timeout of 30 seconds. That means the consumer is within its right to use up all 30 seconds to clean up. The change as is looks a little too broad because it assumes that because a node isn't immediately available that it should abort the request. This doesn't allow for the case where a node later becomes available within the timeout. There is a dependency with, e.g. OFFSET_COMMIT and FIND_COORDINATOR, so if the user is trying to commit their offsets, IMO we should try exhaustively to find the coordinator.

That said, the close() process is rife with edge cases and I'm sure there are things to fix.

Is there an approach by which we can "prove" that continuing to make requests is pointless?

@ShivsundarR
Copy link
Collaborator Author

Hi @kirktrue, thanks for the review.
Yes I agree, the consumer should be able to use the 30 seconds if needed. I think the abort here only happens when the request need not be sent anymore. The abort happens

  • only when onClose in the ConsumerNetworkThread is true, and

  • onClose will be true only when the ConsumerNetworkThread::cleanup is called

  • The cleanup of network thread happens right at the end of close() here after the completion of commitSync(), and updating callbacks.

  • And anyway we do intend to stop sending findCoordinator requests before the network thread closes here, so ideally this should not be a problem.

  • If there happened to be a findCoordinator request issued before the stopFindCoordinatorOnClose event was sent, that goes into a loop of retries when a node is unavailable.

  • For commitSync/acknowledgements(for ShareConsumers) which occur during close, we do NOT abort the findCoordinator even when the node is unavailable(as onClose in NetworkClientDelegate would be false), the respective request managers will handle the response when broker is unavailable and the process will complete as it was happening before this change.

  • Once these stages complete, we reach the end when the network thread itself needs to close with the remaining time on the closeTimeout.

  • Now, if there are any unsent requests with no node to connect to, we try to abort such requests as anyway we have to close the network thread and issuing a FindCoordinator(even if a node was available) is no longer useful beyond this point.

This was my line of thought, does this sound good?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants