Skip to content

Commit 2ae21d1

Browse files
garyrussellartembilan
authored andcommitted
Pause/Resume and Acknowledgment Improvements
- wake consumer when pausing/resuming if currently blocked in `poll()`. - use `stoppableSleep` in `nackSleepAndReset`
1 parent aec0c07 commit 2ae21d1

File tree

1 file changed

+28
-4
lines changed

1 file changed

+28
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+28-4
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,24 @@ public boolean isInExpectedState() {
292292
return isRunning() || isStoppedNormally();
293293
}
294294

295+
@Override
296+
public void pause() {
297+
super.pause();
298+
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
299+
if (consumer != null) {
300+
consumer.wakeIfNecessary();
301+
}
302+
}
303+
304+
@Override
305+
public void resume() {
306+
super.resume();
307+
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
308+
if (consumer != null) {
309+
this.listenerConsumer.wakeIfNecessary();
310+
}
311+
}
312+
295313
@Override
296314
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
297315
ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
@@ -369,7 +387,7 @@ protected void doStop(final Runnable callback, boolean normal) {
369387
if (isRunning()) {
370388
this.listenerConsumerFuture.addCallback(new StopCallback(callback));
371389
setRunning(false);
372-
this.listenerConsumer.wakeIfNecessary();
390+
this.listenerConsumer.wakeIfNecessaryForStop();
373391
setStoppedNormally(normal);
374392
}
375393
}
@@ -1303,7 +1321,7 @@ protected void pollAndInvoke() {
13031321
ConsumerRecords<K, V> records = doPoll();
13041322
if (!this.polling.compareAndSet(true, false) && records != null) {
13051323
/*
1306-
* There is a small race condition where wakeIfNecessary was called between
1324+
* There is a small race condition where wakeIfNecessaryForStop was called between
13071325
* exiting the poll and before we reset the boolean.
13081326
*/
13091327
if (records.count() > 0) {
@@ -1521,12 +1539,18 @@ private void checkRebalanceCommits() {
15211539
}
15221540
}
15231541

1524-
void wakeIfNecessary() {
1542+
void wakeIfNecessaryForStop() {
15251543
if (this.polling.getAndSet(false)) {
15261544
this.consumer.wakeup();
15271545
}
15281546
}
15291547

1548+
void wakeIfNecessary() {
1549+
if (this.polling.get()) {
1550+
this.consumer.wakeup();
1551+
}
1552+
}
1553+
15301554
private void debugRecords(@Nullable ConsumerRecords<K, V> records) {
15311555
if (records != null) {
15321556
this.logger.debug(() -> "Received: " + records.count() + " records");
@@ -2427,7 +2451,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
24272451

24282452
private void nackSleepAndReset() {
24292453
try {
2430-
Thread.sleep(this.nackSleep);
2454+
ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this.thisOrParentContainer, this.nackSleep);
24312455
}
24322456
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
24332457
Thread.currentThread().interrupt();

0 commit comments

Comments
 (0)