Skip to content

[FIXED] Improve per-subject state performance & keep ss.Last up-to-date #6235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 102 additions & 63 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2313,8 +2313,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
Expand Down Expand Up @@ -2443,8 +2443,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
}
if sseq <= ss.First {
update(ss)
Expand Down Expand Up @@ -2743,8 +2743,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
Expand Down Expand Up @@ -3047,8 +3047,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3335,8 +3335,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -4009,8 +4009,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
Expand Down Expand Up @@ -4141,8 +4141,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
if ss == nil {
Expand Down Expand Up @@ -7498,9 +7498,6 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
// for per-subject info would be able to find it still.
smb.dmap.Insert(mseq)
}
}

Expand Down Expand Up @@ -7946,73 +7943,115 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// Only one left.
if ss.Msgs == 1 {
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalulate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
} else if startSlot < 0 {
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
}

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
ss.First = seq
return
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5031,7 +5031,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateFirstForSubj("foo", 1, ss)
mb.recalculateForSubj("foo", ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
Expand Down
89 changes: 49 additions & 40 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
Expand Down Expand Up @@ -428,8 +428,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var totalSkipped uint64
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -583,8 +583,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subjs, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
Expand Down Expand Up @@ -721,8 +721,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
var totalSkipped uint64
// We will track start and end sequences as we go.
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -839,8 +839,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if !ms.removeMsg(ss.First, false) {
break
Expand Down Expand Up @@ -1313,8 +1313,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if !ok {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if ss.First < fseq {
fseq = ss.First
Expand Down Expand Up @@ -1408,38 +1408,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// Only one left.
if ss.Msgs == 1 {
// Update first if we need to, we must check if this removal is about what's going to be ss.First
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
// If we're removing the first message, we must recalculate again.
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
if ss.First == seq {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
ss.Last = ss.First
ss.firstNeedsUpdate = false
return
}

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
return
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate {
tseq := ss.First + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
return
}
break
}
}
}
if ss.lastNeedsUpdate {
tseq := ss.Last - 1
if tseq > ms.state.LastSeq {
tseq = ms.state.LastSeq
}
for ; tseq >= ss.First; tseq-- {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.Last = tseq
ss.lastNeedsUpdate = false
if ss.Msgs == 1 {
ss.First = tseq
ss.firstNeedsUpdate = false
}
return
}
}
}
}
Expand Down
Loading
Loading