Skip to content

Commit 6536f3c

Browse files
garyrussellartembilan
authored andcommitted
GH-2410: Disallow nack() with Out of Order Commits
Resolves #2410 `nack()` cannot be used with out of order commits - the contract means commit all previous offsets and redeliver remaining. - the appliation might nack multiple records. **cherry-pick to 2.9.x, 2.8.x**
1 parent 1edbbac commit 6536f3c

File tree

3 files changed

+17
-13
lines changed

3 files changed

+17
-13
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,8 @@ NOTE: If you want to commit a partial batch, using `nack()`, When using transact
12191219

12201220
IMPORTANT: `nack()` can only be called on the consumer thread that invokes your listener.
12211221

1222+
IMPORTANT: `nack()` is not allowed when using <<ooo-commits, Out of Order Commits>>.
1223+
12221224
With a record listener, when `nack()` is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`.
12231225
The consumer can be paused before redelivery, by setting the `sleep` argument.
12241226
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.
@@ -3783,7 +3785,7 @@ If you go with this approach, then you need to set this producer interceptor on
37833785
Following is an example using the same `MyProducerInterceptor` from above, but changed to not use the internal config property.
37843786

37853787
====
3786-
[source]
3788+
[source, java]
37873789
----
37883790
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
37893791

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+4-12
Original file line numberDiff line numberDiff line change
@@ -3382,14 +3382,10 @@ public void acknowledge() {
33823382
public void nack(Duration sleep) {
33833383
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33843384
"nack() can only be called on the consumer thread");
3385+
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3386+
"nack() is not supported with out-of-order commits (asyncAcks=true)");
33853387
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
33863388
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
3387-
synchronized (ListenerConsumer.this) {
3388-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3389-
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
3390-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
3391-
}
3392-
}
33933389
}
33943390

33953391
@Override
@@ -3429,16 +3425,12 @@ public void acknowledge() {
34293425
public void nack(int index, Duration sleep) {
34303426
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
34313427
"nack() can only be called on the consumer thread");
3428+
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3429+
"nack() is not supported with out-of-order commits (asyncAcks=true)");
34323430
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
34333431
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
34343432
ListenerConsumer.this.nackIndex = index;
34353433
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
3436-
synchronized (ListenerConsumer.this) {
3437-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3438-
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
3439-
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
3440-
}
3441-
}
34423434
int i = 0;
34433435
List<ConsumerRecord<K, V>> toAck = new LinkedList<>();
34443436
for (ConsumerRecord<K, V> record : this.records) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,16 @@ void testInOrderAck(AckMode ackMode) throws Exception {
692692
containerProps.setAsyncAcks(true);
693693
final CountDownLatch latch = new CountDownLatch(4);
694694
final List<Acknowledgment> acks = new ArrayList<>();
695+
final AtomicReference<IllegalStateException> illegal = new AtomicReference<>();
695696
AcknowledgingMessageListener<Integer, String> messageListener = (data, ack) -> {
697+
if (latch.getCount() == 4) {
698+
try {
699+
ack.nack(Duration.ofSeconds(1));
700+
}
701+
catch (IllegalStateException ex) {
702+
illegal.set(ex);
703+
}
704+
}
696705
latch.countDown();
697706
acks.add(ack);
698707
if (latch.getCount() == 0) {
@@ -723,6 +732,7 @@ void testInOrderAck(AckMode ackMode) throws Exception {
723732
verify(consumer).commitSync(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(4L)),
724733
Duration.ofMinutes(1));
725734
container.stop();
735+
assertThat(illegal.get()).isNotNull();
726736
}
727737

728738
private static Stream<Arguments> testInOrderAckPauseUntilAckedParamters() {

0 commit comments

Comments
 (0)