Skip to content

Acknowledgment.nack() leads to infinite consumer pause when asyncAck property set to true #2410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Neverhood565 opened this issue Sep 24, 2022 · 6 comments · Fixed by #2436
Closed

Comments

@Neverhood565
Copy link

Neverhood565 commented Sep 24, 2022

Version 2.8.9

Description

Call of Acknowledgment.nack() leads to infinite consumer pause when container property asyncAck set to true.

The problem seems to be with nack() implementation, as it only clears offsetsInThisBatch values but not map itself:

			@Override
			public void nack(Duration sleep) {
				ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
				synchronized (ListenerConsumer.this) {
					if (ListenerConsumer.this.offsetsInThisBatch != null) {
						ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
						ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
					}
				}
			}

Subsequent call of private void doResumeConsumerIfNeccessary() { does not resume consumer, since offsetsInThisBatch.size() > 0.

To Reproduce

Try to call nack() with asyncAck set to true

Expected behavior

Consumer continues to read messages after specified sleep time out.

Sample

https://github.com/Neverhood565/spring-kafka-nack-bug

@garyrussell
Copy link
Contributor

Thanks for reporting; actually we shouldn't be completely clearing the pendingAcks and offsetsInThisBatch at all. We should only remove offsets for records beyond the current record. We need to complete any pending acks when the missing acks are received.

Bear in mind that using async acks with nack is going to cause redelivery of some records that were previously acked.

Lets say we deliver offsets 0-5 and you ack 0, 2, 4, 5 and nack3 followed by ack 1.

With this code corrected, we will commit offset 2 and redeliver 3, 4, 5.

Bottom line is that if you are using out of order commits with nack, you will need to deal with possible duplicates.

@garyrussell
Copy link
Contributor

It turns out that this is much more complicated than I thought it would be.

Can you explain your use case? Given that nack() must be called on the listener thread, it is not clear how/why you would use out of order commits (asyncAcks) with nack() - right now, I am inclined to simply disallow it.

@garyrussell
Copy link
Contributor

garyrussell commented Sep 28, 2022

Never mind, I found a rather simple solution.

@garyrussell
Copy link
Contributor

Nope - that didn't work; I am back to being inclined to disallow it; but I will give it some more thought overnight.

@davidleacy
Copy link

We also ran into the issue and was not expecting the above behaviour. For us we wanted to take advantages of async calls to KafkaTemplate.Send(). If a nack() occurred we would reprocess messages after the nack() like you mentioned above, in our case duplicate records being sent to the output topic is handled in application logic.

I was thinking maybe perhaps a property to set a max time for a nack()'d record to be acknowledge after which the messages are redelivered to avoid cases such as the above or poison pills?

@garyrussell
Copy link
Contributor

nack() is simply not designed to be used with out-of-order commits; the contract of nack() is "commit the offsets of all previous records and redeliver this, and subsequent records, after some delay".

When using out of order commits, it is impossible to honor that contract - in fact the application might nack() multiple records concurrently.

It was an oversight to allow nack() while asyncAcks is true.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 12, 2022
Resolves spring-projects#2410

`nack()` cannot be used with out of order commits
- the contract means commit all previous offsets and redeliver remaining.
- the appliation might nack multiple records.

**cherry-pick to 2.9.x, 2.8.x**
@garyrussell garyrussell modified the milestones: Backlog, 3.0.0-RC1 Oct 12, 2022
@garyrussell garyrussell self-assigned this Oct 12, 2022
artembilan pushed a commit that referenced this issue Oct 12, 2022
Resolves #2410

`nack()` cannot be used with out of order commits
- the contract means commit all previous offsets and redeliver remaining.
- the appliation might nack multiple records.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Oct 12, 2022
Resolves #2410

`nack()` cannot be used with out of order commits
- the contract means commit all previous offsets and redeliver remaining.
- the appliation might nack multiple records.

**cherry-pick to 2.9.x, 2.8.x**

# Conflicts:
#	spring-kafka-docs/src/main/asciidoc/kafka.adoc
artembilan pushed a commit that referenced this issue Oct 12, 2022
Resolves #2410

`nack()` cannot be used with out of order commits
- the contract means commit all previous offsets and redeliver remaining.
- the appliation might nack multiple records.

**cherry-pick to 2.9.x, 2.8.x**

# Conflicts:
#	spring-kafka-docs/src/main/asciidoc/kafka.adoc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants