Skip to content

[FIXED] Fix DeliverLastPerSubject consumer policy with interior deletes #7005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4105,6 +4105,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
o.updateSkipped(o.sseq)
} else {
o.lss.seqs = o.lss.seqs[1:]
o.sseq = seq
}
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
Expand Down Expand Up @@ -5353,7 +5354,7 @@ func (o *consumer) selectStartingSeqNo() {
if mmp == 1 {
o.sseq = state.FirstSeq
} else {
var filters []string
filters := make([]string, 0, len(o.subjf))
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
} else {
Expand Down
93 changes: 93 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20210,3 +20210,96 @@ func TestJetStreamStreamRetentionUpdatesConsumers(t *testing.T) {
})
}
}

func TestJetStreamMaxMsgsPerSubjectAndDeliverLastPerSubject(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

const subjects = 300
const msgs = subjects * 10

_, err := js.AddStream(&nats.StreamConfig{
Name: "test",
Subjects: []string{"foo.>", "bar.>"},
Retention: nats.LimitsPolicy,
Storage: nats.FileStorage,
MaxMsgsPerSubject: 5,
})
require_NoError(t, err)

// First, publish some messages that match the consumer filter. These
// are the messages that we expect to consume a subset of. The random
// sequences give us interior gaps after MaxMsgsPerSubject has been
// enforced which is needed for this test to work.
for range msgs {
subj := fmt.Sprintf("foo.%d", rand.Intn(subjects))
_, err = js.Publish(subj, nil)
require_NoError(t, err)
}

// Then publish some messages on a different subject. These won't be
// matched by the consumer filter.
for range msgs / 10 {
subj := fmt.Sprintf("bar.%d", rand.Intn(subjects/10))
_, err = js.Publish(subj, nil)
require_NoError(t, err)
}

// Add a deliver last per consumer that matches the first batch of
// published messages only. We expect at this point that the skiplist
// of the consumer will be for foo.> messages only, but the resume
// sequence will be the stream last sequence at the time, i.e. the
// last bar.> message.
_, err = js.AddConsumer("test", &nats.ConsumerConfig{
Name: "test_consumer",
FilterSubjects: []string{"foo.>"},
DeliverPolicy: nats.DeliverLastPerSubjectPolicy,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// Take a copy of the skiplist and the resume value so that we can
// make sure we receive all the messages we expect.
mset, err := s.globalAccount().lookupStream("test")
require_NoError(t, err)
o := mset.lookupConsumer("test_consumer")
o.mu.RLock()
pending := make(map[uint64]struct{}, len(o.lss.seqs))
for _, seq := range o.lss.seqs {
pending[seq] = struct{}{}
}
resume := o.lss.resume
o.mu.RUnlock()

// Now fetch the messages from the consumer.
ps, err := js.PullSubscribe(_EMPTY_, _EMPTY_, nats.Bind("test", "test_consumer"))
require_NoError(t, err)
for range subjects {
msgs, err := ps.Fetch(1)
require_NoError(t, err)
for _, msg := range msgs {
meta, err := msg.Metadata()
require_NoError(t, err)
// We must be expecting this sequence and not have seen it already.
// Once we've seen it, take it out of the map.
_, ok := pending[meta.Sequence.Stream]
require_True(t, ok)
delete(pending, meta.Sequence.Stream)
// Then ack.
require_NoError(t, msg.AckSync())
}
}

// We should have received every message that was in the skiplist.
require_Len(t, len(pending), 0)

// When we've run out of last sequences per subject, the consumer
// should now continue from the resume seq, i.e. the last bar.> message.
o.mu.RLock()
sseq := o.sseq
o.mu.RUnlock()
require_Equal(t, sseq, resume+1)
}