diff --git a/server/filestore.go b/server/filestore.go index 622341970cd..b4a9a918a7d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5701,6 +5701,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false } else { mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } @@ -6426,6 +6427,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false } else { mb.fss.Insert(bsubj, SimpleState{ Msgs: 1, @@ -8490,8 +8492,11 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { } if startSlot >= len(mb.cache.idx) { ss.First = ss.Last + ss.firstNeedsUpdate = false + ss.lastNeedsUpdate = false return } + endSlot := int(ss.Last - mb.cache.fseq) if endSlot < 0 { endSlot = 0 @@ -8518,6 +8523,8 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { li := int(bi) - mb.cache.off if li >= len(mb.cache.buf) { ss.First = ss.Last + // Only need to reset ss.lastNeedsUpdate, ss.firstNeedsUpdate is already reset above. + ss.lastNeedsUpdate = false return } buf := mb.cache.buf[li:] @@ -8641,6 +8648,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false } else { mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } diff --git a/server/memstore.go b/server/memstore.go index 821489de2d8..79e3e37ee92 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -206,6 +206,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, tt if ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false // Check per subject limits. if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) { ms.enforcePerSubjectLimit(subj, ss) diff --git a/server/store_test.go b/server/store_test.go index c62988ae02f..b5a894fdf87 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -254,6 +254,24 @@ func TestStoreSubjectStateConsistency(t *testing.T) { expectFirstSeq(6) require_Equal(t, ss.Last, 6) expectLastSeq(6) + + // We store a new message for ss.Last and remove it after, which marks it to be recalculated. + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + removed, err = fs.RemoveMsg(8) + require_NoError(t, err) + require_True(t, removed) + // This will be the new ss.Last message, so reset ss.lastNeedsUpdate + _, _, err = fs.StoreMsg("foo", nil, nil, 0) + require_NoError(t, err) + + // ss.First should remain the same, but ss.Last should equal the last message. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 2) + require_Equal(t, ss.First, 6) + expectFirstSeq(6) + require_Equal(t, ss.Last, 9) + expectLastSeq(9) }, ) }