diff --git a/server/consumer.go b/server/consumer.go index 07fd4a079da..0ffd72af57a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5836,6 +5836,11 @@ func (o *consumer) requestNextMsgSubject() string { func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Lock() + // Update our cached num pending only if we think deliverMsg has not done so. + if sseq >= o.sseq && o.isFilteredMatch(subj) { + o.npc-- + } + // Check if this message was pending. p, wasPending := o.pending[sseq] var rdc uint64 = 1 @@ -5843,14 +5848,6 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { rdc = o.rdc[sseq] } - // Update our cached num pending only if we think deliverMsg has not done so. - // Either we have not reached the message yet, or we've hit the race condition - // when there is contention at the beginning of the stream. In which case we can - // only decrement if the ack floor is still low enough to be able to detect it. - if sseq > o.asflr && (sseq >= o.sseq || !wasPending) && o.isFilteredMatch(subj) { - o.npc-- - } - o.mu.Unlock() // If it was pending process it like an ack. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 01aedccf51c..a83e8ca39b6 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24742,7 +24742,7 @@ func TestJetStreamWouldExceedLimits(t *testing.T) { require_True(t, js.wouldExceedLimits(FileStorage, int(js.config.MaxStore)+1)) } -func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) { +func TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -24768,7 +24768,7 @@ func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) { npc := o.npc o.mu.RUnlock() if npc != expected { - return fmt.Errorf("expected npc=%d, got %d", npc, expected) + return fmt.Errorf("expected npc=%d, got %d", expected, npc) } return nil }) @@ -24807,9 +24807,66 @@ func TestJetStreamConsumerDecrementPendingCountOnSkippedMsg(t *testing.T) { o.decStreamPending(2, "foo") requireExpected(1) - // This is the deleted message that was skipped, and we can decrement the pending count - // because it's not pending and only as long as the ack floor hasn't moved up yet. + // This is the deleted message that was skipped, we've hit the race condition and are not able to + // fix it at this point. If we decrement then we could have decremented it twice if the message + // was removed as a result of an Ack with Interest or WorkQueue retention, instead of due to contention. o.decStreamPending(3, "foo") + requireExpected(1) +} + +func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + }) + require_NoError(t, err) + + for i := 0; i < 2; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + + requireExpected := func(expected int64) { + t.Helper() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + o.mu.RLock() + npc := o.npc + o.mu.RUnlock() + if npc != expected { + return fmt.Errorf("expected npc=%d, got %d", expected, npc) + } + return nil + }) + } + + // Expect 2 messages pending. + requireExpected(2) + + // Fetch 2 messages and ack the last. + msgs, err := sub.Fetch(2) + require_NoError(t, err) + require_Len(t, len(msgs), 2) + msg := msgs[1] + err = msg.AckSync() + require_NoError(t, err) + + // We've fetched 2 message so should report 0 pending. requireExpected(0) }