-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Calling .nack()
suspends consumer group rebalance
#2128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Many thanks for the repro; big help! Interesting problem; it's going to be tricky to solve it because we don't find out about the rebalance until we next poll the consumer. I think we'll need to change the architecture to pause the consumer for the nack sleep time and continue to poll it; however, then, the accuracy of the sleep time will be affected by the poll timeout. |
Resolves spring-projects#2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. **cherry-pick to 2.8.x**
Resolves spring-projects#2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. **cherry-pick to 2.8.x**
Resolves spring-projects#2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
Resolves spring-projects#2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
Resolves #2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
Resolves #2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
Hi, following this fix, nack ignores the sleep time parameter. Messages are re-consumed 5 seconds later, no matter what the sleep time value is. Is there something I can do about it ? |
The default poll timeout is 5 seconds, so that's a smoking gun. I'll take a look, but the logic looks correct to me When receive the nack, we set
and resume only when
|
Works as expected for me: @SpringBootApplication
public class Kgh2128Application {
private static final Logger log = LoggerFactory.getLogger(Kgh2128Application.class);
public static void main(String[] args) {
SpringApplication.run(Kgh2128Application.class, args);
}
@KafkaListener(id = "kgh2128", topics = "kgh2128")
void listen(String in, Acknowledgment ack) {
log.info(in);
ack.nack(15_000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh2128").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("kgh2128", "foo");
template.send("kgh2128", "bar");
};
}
}
@jffourmond Please provide a similar example that exhibits the behavior you are seeing. |
Thank you @garyrussell for your answer. I think the difference of behavior we observe lies in the sleep time value we use. If I call ack.nack(15_000) like you do, my message is re-consumed after 15s as expected. But my sleep time value is 1000, because I wanted to retry every second. It used to work as I wanted in spring-kafka 2.8.3. Is it a bad practice to set a sleep time < DEFAULT_POLL_TIMEOUT ? |
I wouldn't call it an anti-pattern, but with this change, the actual sleep time will be I will update the documentation. |
See spring-projects#2128 **cherry-pick to 2.9.x, 2.8.x**
See #2128 **cherry-pick to 2.9.x, 2.8.x**
See #2128 **cherry-pick to 2.9.x, 2.8.x**
See #2128 **cherry-pick to 2.9.x, 2.8.x**
Thanks @garyrussell. I'm not sure the |
Yes, I realized that when I updated the documentation: https://github.com/spring-projects/spring-kafka/pull/2254/files
|
However, that says |
Hi @garyrussell
The javadocs on org.springframework.kafka.support.Acknowledgment.nack(Duration sleep) and org.springframework.kafka.support.Acknowledgment.nack(int index, Duration sleep) also need updating: |
@tasosz You are welcome to send a PR addressing this. Or we can look at this later. Reopening the issue. Thanks! |
…PollInterval to pollTimeout - Correct the Javadoc to specify `pollTimeout` instead of `maxPollInterval` for the `nack()` methods.
…PollInterval to pollTimeout - Correct the Javadoc to specify `pollTimeout` instead of `maxPollInterval` for the `nack()` methods.
@tasosz @sobychacko @garyrussell I created a PR addressing the documentation issue: |
- Correct the Javadoc to specify `pollTimeout` instead of `maxPollInterval` for the `nack()` methods.
In what version(s) of Spring for Apache Kafka are you seeing this issue?
I tested the following versions, they all show the same behaviour:
2.3.13, 2.5.14, 2.6.3, 2.8.2
Describe the bug
After calling
KafkaMessageListenerContainer.nack()
if a consumer group rebalance is triggered then it won't finish until thenack()
timeout has passed.The extended CG rebalance prevents every member of the consumer group from progressing with message processing.
To Reproduce
Please follow the steps described in the sample repository: https://github.com/tkornai/nack-rebalance-demo
Expected behavior
Calling
nack()
must not delay subsequent consumer group rebalances.Sample
A link to a GitHub repository with a minimal, reproducible, sample.
The text was updated successfully, but these errors were encountered: