Skip to content

Commit c543f53

Browse files
Improve consumer pending count tracking during stream contention (#6297)
The drifting tests would occasionally fail due to the consumer pending count drifting. This was due to a race condition described on `checkNumPending`: ``` // Does some sanity checks to see if we should re-calculate. // Since there is a race when decrementing when there is contention at the beginning of the stream. // The race is a getNextMsg skips a deleted msg, and then the decStreamPending call fires. // This does some quick sanity checks to see if we should re-calculate num pending. // Lock should be held. func (o *consumer) checkNumPending() uint64 { ``` This PR doesn't fix this race condition, but improves the tracking which improves test reliability. If the race condition happens we can still check if the deleted message is between `o.asflr` and the `o.sseq` that's skipped ahead. In which case we can still decrement the pending count (`o.npc`) if the message is not pending/delivered. This improves the reliability of the pending count tracking as long as the ack floor hasn't moved up yet. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 4ece724 + 0d0720d commit c543f53

File tree

2 files changed

+74
-4
lines changed

2 files changed

+74
-4
lines changed

server/consumer.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5835,17 +5835,22 @@ func (o *consumer) requestNextMsgSubject() string {
58355835

58365836
func (o *consumer) decStreamPending(sseq uint64, subj string) {
58375837
o.mu.Lock()
5838-
// Update our cached num pending only if we think deliverMsg has not done so.
5839-
if sseq >= o.sseq && o.isFilteredMatch(subj) {
5840-
o.npc--
5841-
}
58425838

58435839
// Check if this message was pending.
58445840
p, wasPending := o.pending[sseq]
58455841
var rdc uint64 = 1
58465842
if o.rdc != nil {
58475843
rdc = o.rdc[sseq]
58485844
}
5845+
5846+
// Update our cached num pending only if we think deliverMsg has not done so.
5847+
// Either we have not reached the message yet, or we've hit the race condition
5848+
// when there is contention at the beginning of the stream. In which case we can
5849+
// only decrement if the ack floor is still low enough to be able to detect it.
5850+
if o.isFilteredMatch(subj) && sseq > o.asflr && (sseq >= o.sseq || !wasPending) {
5851+
o.npc--
5852+
}
5853+
58495854
o.mu.Unlock()
58505855

58515856
// If it was pending process it like an ack.

server/jetstream_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24737,3 +24737,68 @@ func TestJetStreamWouldExceedLimits(t *testing.T) {
2473724737
require_True(t, js.wouldExceedLimits(MemoryStorage, int(js.config.MaxMemory)+1))
2473824738
require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1))
2473924739
}
24740+
24741+
func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) {
24742+
s := RunBasicJetStreamServer(t)
24743+
defer s.Shutdown()
24744+
24745+
nc, js := jsClientConnect(t, s)
24746+
defer nc.Close()
24747+
24748+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
24749+
require_NoError(t, err)
24750+
24751+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
24752+
require_NoError(t, err)
24753+
24754+
acc, err := s.lookupAccount(globalAccountName)
24755+
require_NoError(t, err)
24756+
mset, err := acc.lookupStream("TEST")
24757+
require_NoError(t, err)
24758+
o := mset.lookupConsumer("CONSUMER")
24759+
24760+
requireExpected := func(expected int64) {
24761+
t.Helper()
24762+
o.mu.RLock()
24763+
defer o.mu.RUnlock()
24764+
require_Equal(t, o.npc, expected)
24765+
}
24766+
24767+
// Should initially report no messages available.
24768+
requireExpected(0)
24769+
24770+
// A new message is available, should report in pending.
24771+
_, err = js.Publish("foo", nil)
24772+
require_NoError(t, err)
24773+
requireExpected(1)
24774+
24775+
// Pending count should decrease when the message is deleted.
24776+
err = js.DeleteMsg("TEST", 1)
24777+
require_NoError(t, err)
24778+
requireExpected(0)
24779+
24780+
// Make more messages available, should report in pending.
24781+
_, err = js.Publish("foo", nil)
24782+
require_NoError(t, err)
24783+
_, err = js.Publish("foo", nil)
24784+
require_NoError(t, err)
24785+
requireExpected(2)
24786+
24787+
// Simulate getNextMsg being called and the starting sequence to skip over a deleted message.
24788+
// Also simulate one pending message.
24789+
o.mu.Lock()
24790+
o.sseq = 100
24791+
o.npc--
24792+
o.pending = make(map[uint64]*Pending)
24793+
o.pending[2] = &Pending{}
24794+
o.mu.Unlock()
24795+
24796+
// Since this message is pending we should not decrement pending count as we've done so already.
24797+
o.decStreamPending(2, "foo")
24798+
requireExpected(1)
24799+
24800+
// This is the deleted message that was skipped, and we can decrement the pending count
24801+
// because it's not pending and only as long as the ack floor hasn't moved up yet.
24802+
o.decStreamPending(3, "foo")
24803+
requireExpected(0)
24804+
}

0 commit comments

Comments
 (0)