Skip to content

Commit 7bdd728

Browse files
[FIXED] Consumer skips some messages
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 7ca0fe3 commit 7bdd728

File tree

3 files changed

+26
-0
lines changed

3 files changed

+26
-0
lines changed

server/filestore.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5701,6 +5701,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
57015701
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
57025702
ss.Msgs++
57035703
ss.Last = seq
5704+
ss.lastNeedsUpdate = false
57045705
} else {
57055706
mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
57065707
}
@@ -6426,6 +6427,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
64266427
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
64276428
ss.Msgs++
64286429
ss.Last = seq
6430+
ss.lastNeedsUpdate = false
64296431
} else {
64306432
mb.fss.Insert(bsubj, SimpleState{
64316433
Msgs: 1,
@@ -8490,8 +8492,11 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
84908492
}
84918493
if startSlot >= len(mb.cache.idx) {
84928494
ss.First = ss.Last
8495+
ss.firstNeedsUpdate = false
8496+
ss.lastNeedsUpdate = false
84938497
return
84948498
}
8499+
84958500
endSlot := int(ss.Last - mb.cache.fseq)
84968501
if endSlot < 0 {
84978502
endSlot = 0
@@ -8518,6 +8523,7 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
85188523
li := int(bi) - mb.cache.off
85198524
if li >= len(mb.cache.buf) {
85208525
ss.First = ss.Last
8526+
ss.lastNeedsUpdate = false
85218527
return
85228528
}
85238529
buf := mb.cache.buf[li:]
@@ -8641,6 +8647,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
86418647
if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil {
86428648
ss.Msgs++
86438649
ss.Last = seq
8650+
ss.lastNeedsUpdate = false
86448651
} else {
86458652
mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq})
86468653
}

server/memstore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt
206206
if ss != nil {
207207
ss.Msgs++
208208
ss.Last = seq
209+
ss.lastNeedsUpdate = false
209210
// Check per subject limits.
210211
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
211212
ms.enforcePerSubjectLimit(subj, ss)

server/store_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,24 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
254254
expectFirstSeq(6)
255255
require_Equal(t, ss.Last, 6)
256256
expectLastSeq(6)
257+
258+
// We store a new message for ss.Last and remove it after, which marks it to be recalculated.
259+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
260+
require_NoError(t, err)
261+
removed, err = fs.RemoveMsg(8)
262+
require_NoError(t, err)
263+
require_True(t, removed)
264+
// This will be the new ss.Last message, so reset ss.lastNeedsUpdate
265+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
266+
require_NoError(t, err)
267+
268+
// ss.First should remain the same, but ss.Last should equal the last message.
269+
ss = getSubjectState()
270+
require_Equal(t, ss.Msgs, 2)
271+
require_Equal(t, ss.First, 6)
272+
expectFirstSeq(6)
273+
require_Equal(t, ss.Last, 9)
274+
expectLastSeq(9)
257275
},
258276
)
259277
}

0 commit comments

Comments
 (0)