From 79b41b122511dfa80e0812e2a0f223a818bb2268 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 09:42:56 +0100 Subject: [PATCH 1/3] Improve per-subject state performance Signed-off-by: Maurice van Veen --- server/filestore.go | 19 +++---------------- server/memstore.go | 19 +++---------------- 2 files changed, 6 insertions(+), 32 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 20f66579170..9205acb5f94 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7946,22 +7946,6 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // Only one left. - if ss.Msgs == 1 { - // Update first if we need to, we must check if this removal is about what's going to be ss.First - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) - } - // If we're removing the first message, we must recalculate again. - // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. - if ss.First == seq { - mb.recalculateFirstForSubj(subj, ss.First, ss) - } - ss.Last = ss.First - ss.firstNeedsUpdate = false - return - } - // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } @@ -8012,6 +7996,9 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si continue } ss.First = seq + if ss.Msgs == 1 { + ss.Last = seq + } return } } diff --git a/server/memstore.go b/server/memstore.go index b183d117da5..4aa5ebbae6f 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1408,22 +1408,6 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // Only one left. - if ss.Msgs == 1 { - // Update first if we need to, we must check if this removal is about what's going to be ss.First - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) - } - // If we're removing the first message, we must recalculate again. - // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it. - if ss.First == seq { - ms.recalculateFirstForSubj(subj, ss.First, ss) - } - ss.Last = ss.First - ss.firstNeedsUpdate = false - return - } - // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate } @@ -1438,6 +1422,9 @@ func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si for ; tseq <= ss.Last; tseq++ { if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { ss.First = tseq + if ss.Msgs == 1 { + ss.Last = tseq + } ss.firstNeedsUpdate = false return } From c1dacd4680447e71b04c0b7fe384a1917cba9e8f Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 15:57:28 +0100 Subject: [PATCH 2/3] Don't mark deletes, we don't recalculate in fs.removePerSubject anymore Signed-off-by: Maurice van Veen --- server/filestore.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 9205acb5f94..78080c85e08 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -7498,9 +7498,6 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) { // Update fss smb.removeSeqPerSubject(sm.subj, mseq) fs.removePerSubject(sm.subj) - // Need to mark the sequence as deleted. Otherwise, recalculating ss.First - // for per-subject info would be able to find it still. - smb.dmap.Insert(mseq) } } From 3304ee79a2f006a668ed92e33d50cb30fa00fa09 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 10 Dec 2024 18:07:28 +0100 Subject: [PATCH 3/3] [FIXED] ss.Last was not kept up-to-date Signed-off-by: Maurice van Veen --- server/filestore.go | 145 +++++++++++++++++++++++++++------------ server/filestore_test.go | 2 +- server/memstore.go | 72 ++++++++++++------- server/store.go | 2 + server/store_test.go | 39 ++++++++--- 5 files changed, 181 insertions(+), 79 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 78080c85e08..81966a23007 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2313,8 +2313,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor fseq = lseq + 1 for _, subj := range subs { ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } if ss == nil || start > ss.Last || ss.First >= fseq { continue @@ -2443,8 +2443,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) } if sseq <= ss.First { update(ss) @@ -2743,8 +2743,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.lsts = time.Now().UnixNano() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } oss := fss[subj] if oss.First == 0 { // New @@ -3047,8 +3047,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return } subj := bytesToString(bsubj) - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3335,8 +3335,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } if sseq <= ss.First { t += ss.Msgs @@ -4009,8 +4009,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { info.fblk = i } } - if ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() // Re-acquire fs lock @@ -4141,8 +4141,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && ss.firstNeedsUpdate { - mb.recalculateFirstForSubj(subj, ss.First, ss) + if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { + mb.recalculateForSubj(subj, ss) } mb.mu.Unlock() if ss == nil { @@ -7943,13 +7943,14 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // We can lazily calculate the first sequence when needed. + // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalulate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject in this block. // Will avoid slower path message lookups and scan the cache directly instead. -func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { +func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { // Need to make sure messages are loaded. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -7957,46 +7958,100 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si } } - // Mark first as updated. - ss.firstNeedsUpdate = false - - startSlot := int(startSeq - mb.cache.fseq) + startSlot := int(ss.First - mb.cache.fseq) + if startSlot < 0 { + startSlot = 0 + } if startSlot >= len(mb.cache.idx) { ss.First = ss.Last return - } else if startSlot < 0 { - startSlot = 0 } - - fseq := startSeq + 1 - if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { - fseq = mbFseq + endSlot := int(ss.Last - mb.cache.fseq) + if endSlot < 0 { + endSlot = 0 + } + if endSlot >= len(mb.cache.idx) || startSlot > endSlot { + return } + var le = binary.LittleEndian - for slot := startSlot; slot < len(mb.cache.idx); slot++ { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. - continue + if ss.firstNeedsUpdate { + // Mark first as updated. + ss.firstNeedsUpdate = false + + fseq := ss.First + 1 + if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { + fseq = mbFseq + } + for slot := startSlot; slot < len(mb.cache.idx); slot++ { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. + continue + } + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + ss.First = ss.Last + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + ss.First = seq + if ss.Msgs == 1 { + ss.Last = seq + ss.lastNeedsUpdate = false + return + } + // Skip the start slot ahead, if we need to recalculate last we can stop early. + startSlot = slot + break + } } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - ss.First = ss.Last - return + } + if ss.lastNeedsUpdate { + // Mark last as updated. + ss.lastNeedsUpdate = false + + lseq := ss.Last - 1 + if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq { + lseq = mbLseq } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + for slot := endSlot; slot >= startSlot; slot-- { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. continue } - ss.First = seq - if ss.Msgs == 1 { + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + // Can't overwrite ss.Last, just skip. + return + } + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) { + continue + } + // Sequence should never be lower, but guard against it nonetheless. + if seq < ss.First { + seq = ss.First + } ss.Last = seq + if ss.Msgs == 1 { + ss.First = seq + ss.firstNeedsUpdate = false + } + return } - return } } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 32dab26a2e0..34a7862440d 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5031,7 +5031,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { mb.clearCacheAndOffset() // Now call with start sequence of 1, the old one // This will panic without the fix. - mb.recalculateFirstForSubj("foo", 1, ss) + mb.recalculateForSubj("foo", ss) // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } diff --git a/server/memstore.go b/server/memstore.go index 4aa5ebbae6f..eb3ff77762e 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -141,8 +141,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int return ErrMaxBytes } // If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room. - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } sm, ok := ms.msgs[ss.First] if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) { @@ -428,8 +428,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -583,8 +583,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { fss := make(map[string]SimpleState) ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { subjs := string(subj) - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subjs, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subjs, ss) } oss := fss[subjs] if oss.First == 0 { // New @@ -721,8 +721,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var totalSkipped uint64 // We will track start and end sequences as we go. IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate { - ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + if fss.firstNeedsUpdate || fss.lastNeedsUpdate { + ms.recalculateForSubj(bytesToString(subj), fss) } if sseq <= fss.First { update(fss) @@ -839,8 +839,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { return } for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if !ms.removeMsg(ss.First, false) { break @@ -1313,8 +1313,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store if !ok { continue } - if ss.firstNeedsUpdate { - ms.recalculateFirstForSubj(subj, ss.First, ss) + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + ms.recalculateForSubj(subj, ss) } if ss.First < fseq { fseq = ss.First @@ -1408,25 +1408,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // We can lazily calculate the first sequence when needed. + // We can lazily calculate the first/last sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first sequence for this subject in this block. +// Will recalculate the first and/or last sequence for this subject. // Lock should be held. -func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { - tseq := startSeq + 1 - if tseq < ms.state.FirstSeq { - tseq = ms.state.FirstSeq - } - for ; tseq <= ss.Last; tseq++ { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { - ss.First = tseq - if ss.Msgs == 1 { +func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) { + if ss.firstNeedsUpdate { + tseq := ss.First + 1 + if tseq < ms.state.FirstSeq { + tseq = ms.state.FirstSeq + } + for ; tseq <= ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + ss.firstNeedsUpdate = false + if ss.Msgs == 1 { + ss.Last = tseq + ss.lastNeedsUpdate = false + return + } + break + } + } + } + if ss.lastNeedsUpdate { + tseq := ss.Last - 1 + if tseq > ms.state.LastSeq { + tseq = ms.state.LastSeq + } + for ; tseq >= ss.First; tseq-- { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { ss.Last = tseq + ss.lastNeedsUpdate = false + if ss.Msgs == 1 { + ss.First = tseq + ss.firstNeedsUpdate = false + } + return } - ss.firstNeedsUpdate = false - return } } } diff --git a/server/store.go b/server/store.go index ed972922ffe..fb2381a1e50 100644 --- a/server/store.go +++ b/server/store.go @@ -169,6 +169,8 @@ type SimpleState struct { // Internal usage for when the first needs to be updated before use. firstNeedsUpdate bool + // Internal usage for when the last needs to be updated before use. + lastNeedsUpdate bool } // LostStreamData indicates msgs that have been lost. diff --git a/server/store_test.go b/server/store_test.go index d2c3481a1a5..168b488d62c 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -152,6 +152,19 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss := fs.SubjectsState("foo") return ss["foo"] } + var smp StoreMsg + expectFirstSeq := func(eseq uint64) { + t.Helper() + sm, _, err := fs.LoadNextMsg("foo", false, 0, &smp) + require_NoError(t, err) + require_Equal(t, sm.seq, eseq) + } + expectLastSeq := func(eseq uint64) { + t.Helper() + sm, err := fs.LoadLastMsg("foo", &smp) + require_NoError(t, err) + require_Equal(t, sm.seq, eseq) + } // Publish an initial batch of messages. for i := 0; i < 4; i++ { @@ -163,7 +176,9 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss := getSubjectState() require_Equal(t, ss.Msgs, 4) require_Equal(t, ss.First, 1) + expectFirstSeq(1) require_Equal(t, ss.Last, 4) + expectLastSeq(4) // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. removed, err := fs.RemoveMsg(1) @@ -174,29 +189,35 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss = getSubjectState() require_Equal(t, ss.Msgs, 3) require_Equal(t, ss.First, 2) + expectFirstSeq(2) require_Equal(t, ss.Last, 4) + expectLastSeq(4) - // Remove last message. + // Remove last message, ss.Last is lazy so will only mark ss.lastNeedsUpdate. removed, err = fs.RemoveMsg(4) require_NoError(t, err) require_True(t, removed) - // ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases. + // Will update last, so corrects to 3. ss = getSubjectState() require_Equal(t, ss.Msgs, 2) require_Equal(t, ss.First, 2) - require_Equal(t, ss.Last, 4) + expectFirstSeq(2) + require_Equal(t, ss.Last, 3) + expectLastSeq(3) // Remove first message again. removed, err = fs.RemoveMsg(2) require_NoError(t, err) require_True(t, removed) - // Since we only have one message left, must update ss.First and set ss.Last to equal. + // Since we only have one message left, must update ss.First and ensure ss.Last equals. ss = getSubjectState() require_Equal(t, ss.Msgs, 1) require_Equal(t, ss.First, 3) + expectFirstSeq(3) require_Equal(t, ss.Last, 3) + expectLastSeq(3) // Publish some more messages so we can test another scenario. for i := 0; i < 3; i++ { @@ -208,7 +229,9 @@ func TestStoreSubjectStateConsistency(t *testing.T) { ss = getSubjectState() require_Equal(t, ss.Msgs, 4) require_Equal(t, ss.First, 3) + expectFirstSeq(3) require_Equal(t, ss.Last, 7) + expectLastSeq(7) // Remove last sequence, ss.Last is lazy so doesn't get updated. removed, err = fs.RemoveMsg(7) @@ -220,18 +243,18 @@ func TestStoreSubjectStateConsistency(t *testing.T) { require_NoError(t, err) require_True(t, removed) - // Remove (now) first sequence, but because ss.First is lazy we first need to recalculate - // to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First - // yet again, since ss.Last is lazy and is not correct. + // Remove (now) first sequence. Both ss.First and ss.Last are lazy and both need to be recalculated later. removed, err = fs.RemoveMsg(5) require_NoError(t, err) require_True(t, removed) - // ss.First should equal ss.Last, last should have been updated now. + // ss.First and ss.Last should both be recalculated and equal each other. ss = getSubjectState() require_Equal(t, ss.Msgs, 1) require_Equal(t, ss.First, 6) + expectFirstSeq(6) require_Equal(t, ss.Last, 6) + expectLastSeq(6) }, ) }