-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[FIXED] Only deliver replicated message after quorum #6792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Keeping this PR as draft for a little bit, to still do some more testing using both our internal tooling and Antithesis. |
This will most surely tank performance of ordered consumers for quick consumption.. |
Ordered non-replicated consumers are not impacted. Only replicated consumers, those need proper replication. |
With any replicated consumer with even low RTTs this will limit performance. In general we want to be able to consumer faster than you can ingest (in general). So let's do some benchmarks with real RTT between servers and discuss. |
That is why I would send periodic consumer snapshots to sync replicas. |
Tested and did some benchmarking. Antithesis testing was not able to run into the 'infinite timing out of AckSync' anymore 🎉 This PR ensures data correctness and allows 'exactly once' consumption. The latter was not always true during/after leader changes prior to this PR. Regarding the former, there were data correctness issues with the message delivered counter not monotonically increasing for every delivery due to sending the message to the client first and only replicating after, which could fail and then the next delivery would report incorrect delivery count. Also reiterating that this is only about replicated consumers. Ordered/AckNone/FlowControled/non-replicated consumers remain the same. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I think this is a good change and I strongly believe prioritising correctness is the right thing to do here. I do see that it may potentially reduce performance on some replicated consumers but that will depend heavily on the usage pattern, and also need to keep in mind that some replicated streams with interest and WQ retention policies will not be able to avoid this penalty by scaling consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but think this also needs @derekcollison's approval too.
// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum. | ||
// Lock should be held. | ||
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) { | ||
// Is not explicitly limited in size, but will at maximum hold maximum ack pending. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this comment is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream sequence is used as a key, which is the same for o.pending
. That means it can only ever become as large as max length of o.pending
, i.e. MaxAckPending
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so when new message comes into stream and we kick the consumer we just bail because we hit max pending yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
o.getNextMsg
bails based on hitting max pending, indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood but I am wondering if we should short circuit in stream's signalConsumers()? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could potentially be an additional optimization, although can't say how large the gain would be.
Either way I think it should be in a different PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that PR after this one that short circuits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum. | ||
// Lock should be held. | ||
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) { | ||
// Is not explicitly limited in size, but will at maximum hold maximum ack pending. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that PR after this one that short circuits.
@MauriceVanVeen Happy to rebase on top of latest main? Then I'll merge. |
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
62e57b1
to
a173419
Compare
…quest timeout (#6960) Since consumer messages are now replicated fully prior to delivery, this broke `NoWait` pull requests. `NoWait`'s request timeout would be sent earlier than the replication could complete. For a 2.11 patch version we now bypass replicating first, and send the messages to the client immediately without waiting for replication (just like we do for flow-controlled consumers, AckNone, etc.). We might need to look into replicating first and sending `NoWait`'s request timeout after the replicated messages are sent for 2.12 or later. Introduced by #6792 Resolves #6952 Signed-off-by: Maurice van Veen <[email protected]> Co-authored-by: Piotr Piotrowski <[email protected]>
Related to #6469, about the following code:
o.updateDelivered
requires proposing delivered state through Raft, and even if proposing fails, we immediately sent the message to the client. This is great for performance, but really bad for properly replicating this piece of data. Before the before-mentioned PR there would be a bunch of nasty side-effects of stuck consumer, perceived data loss through missed redeliveries, etc. Because clients could get messages that a new leader wouldn't know about if proposals failed.The core issue is that we should only send the message AFTER we had quorum on updating delivered state. Otherwise the following could happen: message gets sent to the client,
updateDelivered
proposal fails, leader changes,AckSync
will now timeout indefinitely even with retries because the new leader doesn't know this message was even delivered.Signed-off-by: Maurice van Veen [email protected]