Skip to content

[FIXED] Consumer skips some messages (WQ / Interest Streams) #6526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5701,6 +5701,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
Expand Down Expand Up @@ -6426,6 +6427,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(bsubj, SimpleState{
Msgs: 1,
Expand Down Expand Up @@ -8490,8 +8492,11 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
ss.firstNeedsUpdate = false
ss.lastNeedsUpdate = false
return
}

endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
Expand All @@ -8518,6 +8523,8 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
// Only need to reset ss.lastNeedsUpdate, ss.firstNeedsUpdate is already reset above.
ss.lastNeedsUpdate = false
return
}
buf := mb.cache.buf[li:]
Expand Down Expand Up @@ -8641,6 +8648,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
Expand Down
1 change: 1 addition & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt
if ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
// Check per subject limits.
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
ms.enforcePerSubjectLimit(subj, ss)
Expand Down
18 changes: 18 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
expectFirstSeq(6)
require_Equal(t, ss.Last, 6)
expectLastSeq(6)

// We store a new message for ss.Last and remove it after, which marks it to be recalculated.
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)
removed, err = fs.RemoveMsg(8)
require_NoError(t, err)
require_True(t, removed)
// This will be the new ss.Last message, so reset ss.lastNeedsUpdate
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)

// ss.First should remain the same, but ss.Last should equal the last message.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 2)
require_Equal(t, ss.First, 6)
expectFirstSeq(6)
require_Equal(t, ss.Last, 9)
expectLastSeq(9)
},
)
}
Expand Down
Loading