Skip to content

Commit b662be6

Browse files
Fix DeliverLastPerSubject consumer policy with interior deletes (#7005)
When the consumer was trying to find the next message from a skiplist, we were not correctly moving `o.sseq` up to the next skiplist sequence, which would result in missed messages and some acks being ignored. This is particularly evident with interior deletes, such as those created by the `MaxMsgsPerSubject` limit which the unit test uses. Signed-off-by: Neil Twigg <[email protected]>
2 parents 0d6507b + 66b84ec commit b662be6

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed

server/consumer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4105,6 +4105,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
41054105
o.updateSkipped(o.sseq)
41064106
} else {
41074107
o.lss.seqs = o.lss.seqs[1:]
4108+
o.sseq = seq
41084109
}
41094110
pmsg := getJSPubMsgFromPool()
41104111
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
@@ -5353,7 +5354,7 @@ func (o *consumer) selectStartingSeqNo() {
53535354
if mmp == 1 {
53545355
o.sseq = state.FirstSeq
53555356
} else {
5356-
var filters []string
5357+
filters := make([]string, 0, len(o.subjf))
53575358
if o.subjf == nil {
53585359
filters = append(filters, o.cfg.FilterSubject)
53595360
} else {

server/jetstream_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20210,3 +20210,96 @@ func TestJetStreamStreamRetentionUpdatesConsumers(t *testing.T) {
2021020210
})
2021120211
}
2021220212
}
20213+
20214+
func TestJetStreamMaxMsgsPerSubjectAndDeliverLastPerSubject(t *testing.T) {
20215+
s := RunBasicJetStreamServer(t)
20216+
defer s.Shutdown()
20217+
20218+
nc, js := jsClientConnect(t, s)
20219+
defer nc.Close()
20220+
20221+
const subjects = 300
20222+
const msgs = subjects * 10
20223+
20224+
_, err := js.AddStream(&nats.StreamConfig{
20225+
Name: "test",
20226+
Subjects: []string{"foo.>", "bar.>"},
20227+
Retention: nats.LimitsPolicy,
20228+
Storage: nats.FileStorage,
20229+
MaxMsgsPerSubject: 5,
20230+
})
20231+
require_NoError(t, err)
20232+
20233+
// First, publish some messages that match the consumer filter. These
20234+
// are the messages that we expect to consume a subset of. The random
20235+
// sequences give us interior gaps after MaxMsgsPerSubject has been
20236+
// enforced which is needed for this test to work.
20237+
for range msgs {
20238+
subj := fmt.Sprintf("foo.%d", rand.Intn(subjects))
20239+
_, err = js.Publish(subj, nil)
20240+
require_NoError(t, err)
20241+
}
20242+
20243+
// Then publish some messages on a different subject. These won't be
20244+
// matched by the consumer filter.
20245+
for range msgs / 10 {
20246+
subj := fmt.Sprintf("bar.%d", rand.Intn(subjects/10))
20247+
_, err = js.Publish(subj, nil)
20248+
require_NoError(t, err)
20249+
}
20250+
20251+
// Add a deliver last per consumer that matches the first batch of
20252+
// published messages only. We expect at this point that the skiplist
20253+
// of the consumer will be for foo.> messages only, but the resume
20254+
// sequence will be the stream last sequence at the time, i.e. the
20255+
// last bar.> message.
20256+
_, err = js.AddConsumer("test", &nats.ConsumerConfig{
20257+
Name: "test_consumer",
20258+
FilterSubjects: []string{"foo.>"},
20259+
DeliverPolicy: nats.DeliverLastPerSubjectPolicy,
20260+
AckPolicy: nats.AckExplicitPolicy,
20261+
})
20262+
require_NoError(t, err)
20263+
20264+
// Take a copy of the skiplist and the resume value so that we can
20265+
// make sure we receive all the messages we expect.
20266+
mset, err := s.globalAccount().lookupStream("test")
20267+
require_NoError(t, err)
20268+
o := mset.lookupConsumer("test_consumer")
20269+
o.mu.RLock()
20270+
pending := make(map[uint64]struct{}, len(o.lss.seqs))
20271+
for _, seq := range o.lss.seqs {
20272+
pending[seq] = struct{}{}
20273+
}
20274+
resume := o.lss.resume
20275+
o.mu.RUnlock()
20276+
20277+
// Now fetch the messages from the consumer.
20278+
ps, err := js.PullSubscribe(_EMPTY_, _EMPTY_, nats.Bind("test", "test_consumer"))
20279+
require_NoError(t, err)
20280+
for range subjects {
20281+
msgs, err := ps.Fetch(1)
20282+
require_NoError(t, err)
20283+
for _, msg := range msgs {
20284+
meta, err := msg.Metadata()
20285+
require_NoError(t, err)
20286+
// We must be expecting this sequence and not have seen it already.
20287+
// Once we've seen it, take it out of the map.
20288+
_, ok := pending[meta.Sequence.Stream]
20289+
require_True(t, ok)
20290+
delete(pending, meta.Sequence.Stream)
20291+
// Then ack.
20292+
require_NoError(t, msg.AckSync())
20293+
}
20294+
}
20295+
20296+
// We should have received every message that was in the skiplist.
20297+
require_Len(t, len(pending), 0)
20298+
20299+
// When we've run out of last sequences per subject, the consumer
20300+
// should now continue from the resume seq, i.e. the last bar.> message.
20301+
o.mu.RLock()
20302+
sseq := o.sseq
20303+
o.mu.RUnlock()
20304+
require_Equal(t, sseq, resume+1)
20305+
}

0 commit comments

Comments
 (0)