Skip to content

[FIXED] Only deliver replicated message after quorum #6792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 17, 2025

Conversation

MauriceVanVeen
Copy link
Member

Related to #6469, about the following code:

	// Update delivered first.
	o.updateDelivered(dseq, seq, dc, ts)

	// Send message.
	o.outq.send(pmsg)

o.updateDelivered requires proposing delivered state through Raft, and even if proposing fails, we immediately sent the message to the client. This is great for performance, but really bad for properly replicating this piece of data. Before the before-mentioned PR there would be a bunch of nasty side-effects of stuck consumer, perceived data loss through missed redeliveries, etc. Because clients could get messages that a new leader wouldn't know about if proposals failed.

The core issue is that we should only send the message AFTER we had quorum on updating delivered state. Otherwise the following could happen: message gets sent to the client, updateDelivered proposal fails, leader changes, AckSync will now timeout indefinitely even with retries because the new leader doesn't know this message was even delivered.

Signed-off-by: Maurice van Veen [email protected]

@MauriceVanVeen
Copy link
Member Author

Keeping this PR as draft for a little bit, to still do some more testing using both our internal tooling and Antithesis.
But it can be reviewed already, just holding off on merging for now.

@derekcollison
Copy link
Member

This will most surely tank performance of ordered consumers for quick consumption..

@MauriceVanVeen
Copy link
Member Author

This will most surely tank performance of ordered consumers for quick consumption..

Ordered non-replicated consumers are not impacted. Only replicated consumers, those need proper replication.

@derekcollison
Copy link
Member

With any replicated consumer with even low RTTs this will limit performance. In general we want to be able to consumer faster than you can ingest (in general). So let's do some benchmarks with real RTT between servers and discuss.

@derekcollison
Copy link
Member

That is why I would send periodic consumer snapshots to sync replicas.

@MauriceVanVeen
Copy link
Member Author

Tested and did some benchmarking. Antithesis testing was not able to run into the 'infinite timing out of AckSync' anymore 🎉
Benchmarking did of course show lower performance (because now the data is actually replicated), but if the workload actually does some work (and not simply ack immediately), the app processing time starts canceling out the added RTT. Especially when using the new JetStream API, since that sends fetch requests early when using Consume.

This PR ensures data correctness and allows 'exactly once' consumption. The latter was not always true during/after leader changes prior to this PR. Regarding the former, there were data correctness issues with the message delivered counter not monotonically increasing for every delivery due to sending the message to the client first and only replicating after, which could fail and then the next delivery would report incorrect delivery count.

Also reiterating that this is only about replicated consumers. Ordered/AckNone/FlowControled/non-replicated consumers remain the same.

@MauriceVanVeen MauriceVanVeen marked this pull request as ready for review April 14, 2025 13:41
@MauriceVanVeen MauriceVanVeen requested a review from a team as a code owner April 14, 2025 13:41
Copy link
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think this is a good change and I strongly believe prioritising correctness is the right thing to do here. I do see that it may potentially reduce performance on some replicated consumers but that will depend heavily on the usage pattern, and also need to keep in mind that some replicated streams with interest and WQ retention policies will not be able to avoid this penalty by scaling consumers.

Copy link
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but think this also needs @derekcollison's approval too.

// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this comment is true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream sequence is used as a key, which is the same for o.pending. That means it can only ever become as large as max length of o.pending, i.e. MaxAckPending.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so when new message comes into stream and we kick the consumer we just bail because we hit max pending yes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o.getNextMsg bails based on hitting max pending, indeed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood but I am wondering if we should short circuit in stream's signalConsumers()? WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially be an additional optimization, although can't say how large the gain would be.
Either way I think it should be in a different PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that PR after this one that short circuits.

@derekcollison derekcollison self-requested a review April 17, 2025 15:18
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that PR after this one that short circuits.

@neilalexander
Copy link
Member

@MauriceVanVeen Happy to rebase on top of latest main? Then I'll merge.

@MauriceVanVeen MauriceVanVeen force-pushed the maurice/replicate-delivered branch from 62e57b1 to a173419 Compare April 17, 2025 15:37
@neilalexander neilalexander merged commit 5697176 into main Apr 17, 2025
105 of 108 checks passed
@neilalexander neilalexander deleted the maurice/replicate-delivered branch April 17, 2025 16:11
neilalexander added a commit that referenced this pull request Jun 10, 2025
…quest timeout (#6960)

Since consumer messages are now replicated fully prior to delivery, this
broke `NoWait` pull requests. `NoWait`'s request timeout would be sent
earlier than the replication could complete. For a 2.11 patch version we
now bypass replicating first, and send the messages to the client
immediately without waiting for replication (just like we do for
flow-controlled consumers, AckNone, etc.).

We might need to look into replicating first and sending `NoWait`'s
request timeout after the replicated messages are sent for 2.12 or
later.

Introduced by #6792

Resolves #6952

Signed-off-by: Maurice van Veen <[email protected]>
Co-authored-by: Piotr Piotrowski <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants