diff --git a/server/client.go b/server/client.go index 64c0c3b9429..c6e76a954e2 100644 --- a/server/client.go +++ b/server/client.go @@ -4329,7 +4329,7 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, []byte(key)) + index := bytes.Index(hdr, stringToBytes(key)) hdrLen := len(hdr) // Check that we have enough characters, this will handle the -1 case of the key not // being found and will also handle not having enough characters for trailing CRLF. diff --git a/server/consumer.go b/server/consumer.go index a441663bf8e..4852c9b59fa 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5322,9 +5322,6 @@ func (o *consumer) selectStartingSeqNo() { if mmp == 1 { o.sseq = state.FirstSeq } else { - // A threshold for when we switch from get last msg to subjects state. - const numSubjectsThresh = 256 - lss := &lastSeqSkipList{resume: state.LastSeq} var filters []string if o.subjf == nil { filters = append(filters, o.cfg.FilterSubject) @@ -5333,24 +5330,10 @@ func (o *consumer) selectStartingSeqNo() { filters = append(filters, filter.subject) } } - for _, filter := range filters { - if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { - var smv StoreMsg - for subj := range st { - if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { - lss.seqs = append(lss.seqs, sm.seq) - } - } - } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { - for _, ss := range mss { - lss.seqs = append(lss.seqs, ss.Last) - } - } - } - // Sort the skip list if needed. - if len(lss.seqs) > 1 { - slices.Sort(lss.seqs) - } + + lss := &lastSeqSkipList{resume: state.LastSeq} + lss.seqs, _ = o.mset.store.MultiLastSeqs(filters, 0, 0) + if len(lss.seqs) == 0 { o.sseq = state.LastSeq } else { @@ -5874,7 +5857,10 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // Check if this message was pending. p, wasPending := o.pending[sseq] - rdc := o.deliveryCount(sseq) + var rdc uint64 + if wasPending { + rdc = o.deliveryCount(sseq) + } o.mu.Unlock() diff --git a/server/filestore.go b/server/filestore.go index cd62de47815..78a2c9345a5 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -203,6 +203,7 @@ type fileStore struct { firstMoved bool ttls *thw.HashWheel sdm *SDMMeta + lpex time.Time // Last PurgeEx call. } // Represents a message store block and its data. @@ -1909,7 +1910,7 @@ func (fs *fileStore) recoverTTLState() error { // beginning and see if we need to skip this one too. goto retry } - msg, _, err := mb.fetchMsg(seq, &sm) + msg, _, err := mb.fetchMsgNoCopy(seq, &sm) if err != nil { fs.warn("Error loading msg seq %d for recovering TTL: %s", seq, err) continue @@ -2192,7 +2193,7 @@ func (fs *fileStore) expireMsgsOnRecover() error { // Walk messages and remove if expired. fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) for seq := fseq; seq <= lseq; seq++ { - sm, err := mb.cacheLookup(seq, &smv) + sm, err := mb.cacheLookupNoCopy(seq, &smv) // Process interior deleted msgs. if err == errDeletedMsg { // Update dmap. @@ -2337,7 +2338,7 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { // Linear search, hence the dumb part.. ts := t.UnixNano() for seq := fseq; seq <= lseq; seq++ { - sm, _, _ := mb.fetchMsg(seq, &smv) + sm, _, _ := mb.fetchMsgNoCopy(seq, &smv) if sm != nil && sm.ts >= ts { return sm.seq } @@ -2352,7 +2353,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) var updateLLTS bool defer func() { if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() }() @@ -2467,7 +2468,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor var updateLLTS bool defer func() { if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() }() @@ -2481,7 +2482,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor didLoad = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() if filter == _EMPTY_ { filter = fwcs @@ -2642,8 +2643,7 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( } if filter == _EMPTY_ { - filter = fwcs - wc = true + filter, wc = fwcs, true } update := func(ss *SimpleState) { @@ -2659,39 +2659,39 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // Make sure we have fss loaded. mb.ensurePerSubjectInfoLoaded() - _tsa, _fsa := [32]string{}, [32]string{} - tsa, fsa := _tsa[:0], _fsa[:0] - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) - - // 1. See if we match any subs from fss. - // 2. If we match and the sseq is past ss.Last then we can use meta only. - // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + var havePartial bool - isMatch := func(subj string) bool { - if !wc { - return subj == filter + // If we are not a wildcard just use Find() here. Avoids allocations. + if !wc { + if ss, ok := mb.fss.Find(stringToBytes(filter)); ok && ss != nil { + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(filter, ss) + } + if sseq <= ss.First { + update(ss) + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } } - tsa = tokenizeSubjectIntoSlice(tsa[:0], subj) - return isSubsetMatchTokenized(tsa, fsa) + } else { + mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { + if havePartial { + // If we already found a partial then don't do anything else. + return + } + if ss.firstNeedsUpdate || ss.lastNeedsUpdate { + mb.recalculateForSubj(bytesToString(bsubj), ss) + } + if sseq <= ss.First { + update(ss) + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } + }) } - var havePartial bool - mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { - if havePartial { - // If we already found a partial then don't do anything else. - return - } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(bytesToString(bsubj), ss) - } - if sseq <= ss.First { - update(ss) - } else if sseq <= ss.Last { - // We matched but its a partial. - havePartial = true - } - }) - // If we did not encounter any partials we can return here. if !havePartial { return total, first, last @@ -2708,9 +2708,27 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( shouldExpire = true } + _tsa, _fsa := [32]string{}, [32]string{} + tsa, fsa := _tsa[:0], _fsa[:0] + var isMatch func(subj string) bool + + if !wc { + isMatch = func(subj string) bool { return subj == filter } + } else { + fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + isMatch = func(subj string) bool { + tsa = tokenizeSubjectIntoSlice(tsa[:0], subj) + return isSubsetMatchTokenized(tsa, fsa) + } + } + + // 1. See if we match any subs from fss. + // 2. If we match and the sseq is past ss.Last then we can use meta only. + // 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending. + var smv StoreMsg for seq, lseq := sseq, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil { continue } @@ -2978,7 +2996,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) if ss.firstNeedsUpdate || ss.lastNeedsUpdate { @@ -3007,6 +3025,69 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// AllLastSeqs will return a sorted list of last sequences for all subjects. +func (fs *fileStore) AllLastSeqs() ([]uint64, error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + if fs.state.Msgs == 0 || fs.noTrackSubjects() { + return nil, nil + } + + numSubjects := fs.psim.Size() + seqs := make([]uint64, 0, numSubjects) + subs := make(map[string]struct{}, numSubjects) + + for i := len(fs.blks) - 1; i >= 0; i-- { + if len(subs) == numSubjects { + break + } + mb := fs.blks[i] + mb.mu.Lock() + + var shouldExpire bool + if mb.fssNotLoaded() { + // Make sure we have fss loaded. + mb.loadMsgsWithLock() + shouldExpire = true + } + + mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool { + // Check if already been processed and accounted. + if _, ok := subs[string(bsubj)]; !ok { + seqs = append(seqs, ss.Last) + subs[string(bsubj)] = struct{}{} + } + return true + }) + if shouldExpire { + // Expire this cache before moving on. + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + } + + slices.Sort(seqs) + return seqs, nil +} + +// Helper to determine if the filter(s) represent all the subjects. +// Most clients send in subjects even if they match the stream's ingest subjects. +// Lock should be held. +func (fs *fileStore) filterIsAll(filters []string) bool { + if len(filters) != len(fs.cfg.Subjects) { + return false + } + // Sort so we can compare. + slices.Sort(filters) + for i, subj := range filters { + if !subjectIsSubsetMatch(fs.cfg.Subjects[i], subj) { + return false + } + } + return true +} + // MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters. // We will not exceed the maxSeq, which if 0 becomes the store's last sequence. func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) { @@ -3017,6 +3098,11 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i return nil, nil } + // See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set. + if maxSeq == 0 && maxAllowed <= 0 && fs.filterIsAll(filters) { + return fs.AllLastSeqs() + } + lastBlkIndex := len(fs.blks) - 1 lastMB := fs.blks[lastBlkIndex] @@ -3027,7 +3113,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i // Udate last mb index if not last seq. lastBlkIndex, lastMB = fs.selectMsgBlockWithIndex(maxSeq) } - //Make sure non-nil + // Make sure non-nil if lastMB == nil { return nil, nil } @@ -3038,26 +3124,24 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i lastMB.mu.RUnlock() subs := make(map[string]*psi) - ltSeen := make(map[string]uint32) + var numLess int + var maxBlk uint32 + for _, filter := range filters { fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { - s := string(subj) - subs[s] = psi + subs[string(subj)] = psi if psi.lblk < lastMBIndex { - ltSeen[s] = psi.lblk + numLess++ + if psi.lblk > maxBlk { + maxBlk = psi.lblk + } } }) } // If all subjects have a lower last index, select the largest for our walk backwards. - if len(ltSeen) == len(subs) { - max := uint32(0) - for _, mbi := range ltSeen { - if mbi > max { - max = mbi - } - } - lastMB = fs.bim[max] + if numLess == len(subs) { + lastMB = fs.bim[maxBlk] } // Collect all sequences needed. @@ -3076,42 +3160,50 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i // We can start properly looking here. mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() - for subj, psi := range subs { - if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { - if ss.Last <= maxSeq { - seqs = append(seqs, ss.Last) - delete(subs, subj) - } else { - // Need to search for it since last is > maxSeq. - if mb.cacheNotLoaded() { - mb.loadMsgsWithLock() - } - var smv StoreMsg - fseq := atomic.LoadUint64(&mb.first.seq) - for seq := maxSeq; seq >= fseq; seq-- { - sm, _ := mb.cacheLookup(seq, &smv) - if sm == nil || sm.subj != subj { - continue - } - seqs = append(seqs, sm.seq) - delete(subs, subj) - break + + // Iterate the fss and check against our subs. We will delete from subs as we add. + // Once len(subs) == 0 we are done. + mb.fss.IterFast(func(bsubj []byte, ss *SimpleState) bool { + // Already been processed and accounted for was not matched in the first place. + if subs[string(bsubj)] == nil { + return true + } + // Check if we need to recalculate. We only care about the last sequence. + if ss.lastNeedsUpdate { + // mb is already loaded into the cache so should be fast-ish. + mb.recalculateForSubj(bytesToString(bsubj), ss) + } + // If we are equal or below just add to seqs slice. + if ss.Last <= maxSeq { + seqs = append(seqs, ss.Last) + delete(subs, bytesToString(bsubj)) + } else { + // Need to search for the real last since recorded last is > maxSeq. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + } + var smv StoreMsg + fseq := atomic.LoadUint64(&mb.first.seq) + lseq := min(atomic.LoadUint64(&mb.last.seq), maxSeq) + ssubj := bytesToString(bsubj) + for seq := lseq; seq >= fseq; seq-- { + sm, _ := mb.cacheLookupNoCopy(seq, &smv) + if sm == nil || sm.subj != ssubj { + continue } + seqs = append(seqs, sm.seq) + delete(subs, ssubj) + break } - } else if mb.index <= psi.fblk { - // Track which subs are no longer applicable, meaning we will not find a valid msg at this point. - delete(subs, subj) } - // TODO(dlc) we could track lblk like above in case some subs are very far apart. - // Not too bad if fss loaded since we will skip over quickly with it loaded, but might be worth it. - } + return true + }) mb.mu.Unlock() // If maxAllowed was sepcified check that we will not exceed that. if maxAllowed > 0 && len(seqs) > maxAllowed { return nil, ErrTooManyResults } - } if len(seqs) == 0 { return nil, nil @@ -3233,7 +3325,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { continue } @@ -3259,7 +3351,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() return total, validThrough @@ -3285,7 +3377,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() var t uint64 var havePartial bool @@ -3321,7 +3413,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } var smv StoreMsg for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { + if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && isMatch(sm.subj) { t++ } } @@ -3385,7 +3477,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs @@ -3409,7 +3501,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ { continue } @@ -3425,7 +3517,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() } @@ -3542,7 +3634,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { continue } @@ -3568,7 +3660,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() return total, validThrough @@ -3593,7 +3685,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() var t uint64 var havePartial bool @@ -3635,7 +3727,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo updateLLTS = true continue } - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { + if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && isMatch(sm.subj) { t++ updateLLTS = false // cacheLookup already updated it. } @@ -3646,7 +3738,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() total += t @@ -3706,7 +3798,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs }) @@ -3729,7 +3821,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo updateLLTS = true continue } - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil || sm.subj == _EMPTY_ { continue } @@ -3745,7 +3837,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo mb.tryForceExpireCacheLocked() } if updateLLTS { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() } mb.mu.Unlock() } @@ -3830,7 +3922,7 @@ func (mb *msgBlock) setupWriteCache(buf []byte) { if fi != nil { mb.cache.off = int(fi.Size()) } - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() mb.startCacheExpireTimer() } @@ -3861,8 +3953,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mb.fss = stree.NewSubjectTree[SimpleState]() // Set cache time to creation time to start. - ts := time.Now().UnixNano() - mb.llts, mb.lwts = 0, ts + mb.llts, mb.lwts = 0, getAccessTime() // Remember our last sequence number. atomic.StoreUint64(&mb.first.seq, fs.state.LastSeq+1) atomic.StoreUint64(&mb.last.seq, fs.state.LastSeq) @@ -4120,8 +4211,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) { return } var needsRecord bool - - nowts := now.UnixNano() + nowts := getAccessTime() mb.mu.Lock() // If we are empty can just do meta. @@ -4160,21 +4250,13 @@ func (fs *fileStore) SkipMsg() uint64 { defer fs.mu.Unlock() // Grab our current last message block. - mb := fs.lmb - if mb == nil || mb.msgs > 0 && mb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize { - if mb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go mb.recompressOnDiskIfNeeded() - } - var err error - if mb, err = fs.newMsgBlockForWrite(); err != nil { - return 0 - } + mb, err := fs.checkLastBlock(emptyRecordLen) + if err != nil { + return 0 } // Grab time and last seq. - now, seq := time.Now().UTC(), fs.state.LastSeq+1 + now, seq := time.Now(), fs.state.LastSeq+1 // Write skip msg. mb.skipMsg(seq, now) @@ -4227,7 +4309,7 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { } // Insert into dmap all entries and place last as marker. - now := time.Now().UTC() + now := time.Now() nowts := now.UnixNano() lseq := seq + num - 1 @@ -4321,7 +4403,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { shouldExpire = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() bsubj := stringToBytes(subj) if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { @@ -4601,7 +4683,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( } var smv StoreMsg - sm, err := mb.cacheLookup(seq, &smv) + sm, err := mb.cacheLookupNoCopy(seq, &smv) if err != nil { mb.mu.Unlock() fsUnlock() @@ -4615,7 +4697,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( msz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) // Set cache timestamp for last remove. - mb.lrts = time.Now().UnixNano() + mb.lrts = getAccessTime() // Global stats if fs.state.Msgs > 0 { @@ -4948,7 +5030,12 @@ func (fs *fileStore) isClosed() bool { func (mb *msgBlock) spinUpFlushLoop() { mb.mu.Lock() defer mb.mu.Unlock() + mb.spinUpFlushLoopLocked() +} +// Will spin up our flush loop. +// Lock should be held. +func (mb *msgBlock) spinUpFlushLoopLocked() { // Are we already running or closed? if mb.flusher || mb.closed { return @@ -5128,7 +5215,7 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } } // We should have a valid msg to calculate removal stats. - if m, err := mb.cacheLookup(seq, &smv); err == nil { + if m, err := mb.cacheLookupNoCopy(seq, &smv); err == nil { if mb.msgs > 0 { rl := fileStoreMsgSize(m.subj, m.hdr, m.msg) mb.msgs-- @@ -5248,11 +5335,11 @@ func (mb *msgBlock) selectNextFirst() { // Need to get the timestamp. // We will try the cache direct and fallback if needed. var smv StoreMsg - sm, _ := mb.cacheLookup(seq, &smv) + sm, _ := mb.cacheLookupNoCopy(seq, &smv) if sm == nil { // Slow path, need to unlock. mb.mu.Unlock() - sm, _, _ = mb.fetchMsg(seq, &smv) + sm, _, _ = mb.fetchMsgNoCopy(seq, &smv) mb.mu.Lock() } if sm != nil { @@ -5400,7 +5487,7 @@ func (mb *msgBlock) expireCacheLocked() { } // Grab timestamp to compare. - tns := time.Now().UnixNano() + tns := getAccessTime() // For the core buffer of messages, we care about reads and writes, but not removes. bufts := mb.llts @@ -5507,14 +5594,16 @@ func (fs *fileStore) expireMsgs() { // Reason is that we need more information to adjust ack pending in consumers. var smv StoreMsg var sm *StoreMsg + fs.mu.RLock() maxAge := int64(fs.cfg.MaxAge) - minAge := time.Now().UnixNano() - maxAge + minAge := getAccessTime() - maxAge rmcb := fs.rmcb sdmcb := fs.sdmcb sdmTTL := int64(fs.cfg.SubjectDeleteMarkerTTL.Seconds()) sdmEnabled := sdmTTL > 0 fs.mu.RUnlock() + if sdmEnabled && (rmcb == nil || sdmcb == nil) { return } @@ -5525,7 +5614,7 @@ func (fs *fileStore) expireMsgs() { if len(sm.hdr) > 0 { if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 { // The message has a negative TTL, therefore it must "never expire". - minAge = time.Now().UnixNano() - maxAge + minAge = getAccessTime() - maxAge continue } } @@ -5542,7 +5631,7 @@ func (fs *fileStore) expireMsgs() { fs.mu.Unlock() } // Recalculate in case we are expiring a bunch. - minAge = time.Now().UnixNano() - maxAge + minAge = getAccessTime() - maxAge } } @@ -5736,7 +5825,7 @@ func (mb *msgBlock) enableForWriting(fip bool) error { // Spin up our flusher loop if needed. if !fip { - mb.spinUpFlushLoop() + mb.spinUpFlushLoopLocked() } return nil @@ -5752,10 +5841,16 @@ func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error { func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error { mb.mu.Lock() defer mb.mu.Unlock() + return mb.writeMsgRecordLocked(rl, seq, subj, mhdr, msg, ts, flush, true) +} +// Will write the message record to the underlying message block. +// filestore lock will be held. +// mb lock should be held. +func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush, kick bool) error { // Enable for writing if our mfd is not open. if mb.mfd == nil { - if err := mb.enableForWriting(flush); err != nil { + if err := mb.enableForWriting(flush && kick); err != nil { return err } } @@ -5774,7 +5869,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte return err } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq @@ -5867,7 +5962,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte if err != nil { return err } - } else { + } else if kick { // Kick the flusher here. kickFlusher(fch) } @@ -5941,6 +6036,9 @@ func (mb *msgBlock) bytesPending() ([]byte, error) { // Returns the current blkSize including deleted msgs etc. func (mb *msgBlock) blkSize() uint64 { + if mb == nil { + return 0 + } mb.mu.RLock() nb := mb.rbytes mb.mu.RUnlock() @@ -5970,32 +6068,41 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { } } +// Helper to check last msg block and create new one if too big. // Lock should be held. -func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte) (uint64, error) { - var err error +func (fs *fileStore) checkLastBlock(rl uint64) (lmb *msgBlock, err error) { + // Grab our current last message block. + lmb = fs.lmb + rbytes := lmb.blkSize() + if lmb == nil || (rbytes > 0 && rbytes+rl > fs.fcfg.BlockSize) { + if lmb != nil && fs.fcfg.Compression != NoCompression { + // We've now reached the end of this message block, if we want + // to compress blocks then now's the time to do it. + go lmb.recompressOnDiskIfNeeded() + } + if lmb, err = fs.newMsgBlockForWrite(); err != nil { + return nil, err + } + } + return lmb, nil +} +// Lock should be held. +func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte) (uint64, error) { // Get size for this message. rl := fileStoreMsgSize(subj, hdr, msg) if rl&hbit != 0 || rl > rlBadThresh { return 0, ErrMsgTooLarge } // Grab our current last message block. - mb := fs.lmb + mb, err := fs.checkLastBlock(rl) + if err != nil { + return 0, err + } // Mark as dirty for stream state. fs.dirty++ - if mb == nil || mb.msgs > 0 && mb.blkSize()+rl > fs.fcfg.BlockSize { - if mb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go mb.recompressOnDiskIfNeeded() - } - if mb, err = fs.newMsgBlockForWrite(); err != nil { - return 0, err - } - } - // Ask msg block to store in write through cache. err = mb.writeMsgRecord(rl, seq, subj, hdr, msg, ts, fs.fip) @@ -6006,19 +6113,28 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg // Lock should be held. func (fs *fileStore) writeTombstone(seq uint64, ts int64) error { // Grab our current last message block. - lmb := fs.lmb - var err error + lmb, err := fs.checkLastBlock(emptyRecordLen) + if err != nil { + return err + } + return lmb.writeTombstone(seq, ts) +} - if lmb == nil || lmb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize { - if lmb != nil && fs.fcfg.Compression != NoCompression { - // We've now reached the end of this message block, if we want - // to compress blocks then now's the time to do it. - go lmb.recompressOnDiskIfNeeded() - } - if lmb, err = fs.newMsgBlockForWrite(); err != nil { - return err - } +// For writing tombstones to our lmb. This version will enforce maximum block sizes. +// This version does not flush contents. +// Lock should be held. +func (fs *fileStore) writeTombstoneNoFlush(seq uint64, ts int64) error { + // Grab our current last message block. + olmb := fs.lmb + lmb, err := fs.checkLastBlock(emptyRecordLen) + if err != nil { + return err + } + // If we swapped out our lmb, flush any pending. + if olmb != lmb { + olmb.flushPendingMsgs() } + // Write tombstone without flush or kick. return lmb.writeTombstone(seq, ts) } @@ -6434,12 +6550,13 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { popFss = true } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() mb.ttls = 0 lbuf := uint32(len(buf)) var seq, ttls uint64 var sm StoreMsg // Used for finding TTL headers + for index < lbuf { if index+msgHdrSize > lbuf { return errCorruptState @@ -6516,8 +6633,8 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { // Count how many TTLs we think are in this message block. // TODO(nat): Not terribly optimal... if hasHeaders { - if fsm, err := mb.msgFromBuf(buf[index:], &sm, nil); err == nil && fsm != nil { - if _, err = getMessageTTL(fsm.hdr); err == nil && len(fsm.hdr) > 0 { + if fsm, err := mb.msgFromBufNoCopy(buf[index:], &sm, nil); err == nil && fsm != nil { + if ttl := sliceHeader(JSMessageTTL, fsm.hdr); len(ttl) > 0 { ttls++ } } @@ -6659,7 +6776,7 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) { // Decide what we want to do with the buffer in hand. If we have load interest // we will hold onto the whole thing, otherwise empty the buffer, possibly reusing it. - if ts := time.Now().UnixNano(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) { + if ts := getAccessTime(); ts < mb.llts || (ts-mb.llts) <= int64(mb.cexp) { mb.cache.wp += lob } else { if cap(mb.cache.buf) <= maxBufReuse { @@ -6822,7 +6939,7 @@ checkCache: return nil } - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() // FIXME(dlc) - We could be smarter here. if buf, _ := mb.bytesPending(); len(buf) > 0 { @@ -6896,7 +7013,24 @@ checkCache: // Fetch a message from this block, possibly reading in and caching the messages. // We assume the block was selected and is correct, so we do not do range checks. +// Lock should not be held. func (mb *msgBlock) fetchMsg(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error) { + return mb.fetchMsgEx(seq, sm, true) +} + +// Fetch a message from this block, possibly reading in and caching the messages. +// We assume the block was selected and is correct, so we do not do range checks. +// We will not copy the msg data. +// Lock should not be held. +func (mb *msgBlock) fetchMsgNoCopy(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error) { + return mb.fetchMsgEx(seq, sm, false) +} + +// Fetch a message from this block, possibly reading in and caching the messages. +// We assume the block was selected and is correct, so we do not do range checks. +// We will copy the msg data based on doCopy boolean. +// Lock should not be held. +func (mb *msgBlock) fetchMsgEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg, bool, error) { mb.mu.Lock() defer mb.mu.Unlock() @@ -6923,7 +7057,7 @@ func (mb *msgBlock) fetchMsg(seq uint64, sm *StoreMsg) (*StoreMsg, bool, error) } llseq := mb.llseq - fsm, err := mb.cacheLookup(seq, sm) + fsm, err := mb.cacheLookupEx(seq, sm, doCopy) if err != nil { return nil, false, err } @@ -6965,8 +7099,22 @@ const ( ) // Will do a lookup from cache. +// This will copy the msg from the cache. // Lock should be held. func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { + return mb.cacheLookupEx(seq, sm, true) +} + +// Will do a lookup from cache. +// This will NOT copy the msg from the cache. +// Lock should be held. +func (mb *msgBlock) cacheLookupNoCopy(seq uint64, sm *StoreMsg) (*StoreMsg, error) { + return mb.cacheLookupEx(seq, sm, false) +} + +// Will do a lookup from cache. +// Lock should be held. +func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*StoreMsg, error) { if seq < atomic.LoadUint64(&mb.first.seq) || seq > atomic.LoadUint64(&mb.last.seq) { return nil, ErrStoreMsgNotFound } @@ -6980,7 +7128,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // If we have a delete map check it. if mb.dmap.Exists(seq) { - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() return nil, errDeletedMsg } @@ -7011,7 +7159,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { } // Update cache activity. - mb.llts = time.Now().UnixNano() + mb.llts = getAccessTime() li := int(bi) - mb.cache.off if li >= len(mb.cache.buf) { @@ -7026,7 +7174,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { } // Parse from the raw buffer. - fsm, err := mb.msgFromBuf(buf, sm, hh) + fsm, err := mb.msgFromBufEx(buf, sm, hh, doCopy) if err != nil || fsm == nil { return nil, err } @@ -7059,7 +7207,7 @@ func (fs *fileStore) sizeForSeq(seq uint64) int { } var smv StoreMsg if mb := fs.selectMsgBlock(seq); mb != nil { - if sm, _, _ := mb.fetchMsg(seq, &smv); sm != nil { + if sm, _, _ := mb.fetchMsgNoCopy(seq, &smv); sm != nil { return int(fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)) } } @@ -7067,6 +7215,7 @@ func (fs *fileStore) sizeForSeq(seq uint64) int { } // Will return message for the given sequence number. +// This will be returned to external callers. func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) { return fs.msgForSeqLocked(seq, sm, true) } @@ -7117,8 +7266,24 @@ func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) } // Internal function to return msg parts from a raw buffer. +// Raw buffer will be copied into sm. // Lock should be held. func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { + return mb.msgFromBufEx(buf, sm, hh, true) +} + +// Internal function to return msg parts from a raw buffer. +// Raw buffer will NOT be copied into sm. +// Only use for internal use, any message that is passed to upper layers should use mb.msgFromBuf. +// Lock should be held. +func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { + return mb.msgFromBufEx(buf, sm, hh, false) +} + +// Internal function to return msg parts from a raw buffer. +// copy boolean will determine if we make a copy or not. +// Lock should be held. +func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCopy bool) (*StoreMsg, error) { if len(buf) < emptyRecordLen { return nil, errBadMsg } @@ -7169,18 +7334,30 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store hl := le.Uint32(data[slen:]) bi := slen + 4 li := bi + int(hl) - sm.buf = append(sm.buf, data[bi:end]...) + if doCopy { + sm.buf = append(sm.buf, data[bi:end]...) + } else { + sm.buf = data[bi:end] + } li, end = li-bi, end-bi sm.hdr = sm.buf[0:li:li] sm.msg = sm.buf[li:end] } else { - sm.buf = append(sm.buf, data[slen:end]...) + if doCopy { + sm.buf = append(sm.buf, data[slen:end]...) + } else { + sm.buf = data[slen:end] + } sm.msg = sm.buf[0 : end-slen] } sm.seq, sm.ts = seq, ts if slen > 0 { - // Make a copy since sm.subj lifetime may last longer. - sm.subj = string(data[:slen]) + if doCopy { + // Make a copy since sm.subj lifetime may last longer. + sm.subj = string(data[:slen]) + } else { + sm.subj = bytesToString(data[:slen]) + } } return sm, nil @@ -7243,7 +7420,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err return nil, err } // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() var l uint64 // Optimize if subject is not a wildcard. @@ -7781,6 +7958,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint var smv StoreMsg var tombs []msgId + var lowSeq uint64 fs.mu.Lock() // We may remove blocks as we purge, so don't range directly on fs.blks @@ -7812,8 +7990,8 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint shouldExpire = true } - for seq := f; seq <= l; seq++ { - if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && eq(sm.subj, subject) { + for seq, te := f, len(tombs); seq <= l; seq++ { + if sm, _ := mb.cacheLookupNoCopy(seq, &smv); sm != nil && eq(sm.subj, subject) { rl := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) // Do fast in place remove. // Stats @@ -7837,13 +8015,19 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint // PSIM and FSS updates. mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) + // Track tombstones we need to write. tombs = append(tombs, msgId{sm.seq, sm.ts}) + if sm.seq < lowSeq || lowSeq == 0 { + lowSeq = sm.seq + } // Check for first message. if seq == atomic.LoadUint64(&mb.first.seq) { mb.selectNextFirst() if mb.isEmpty() { + // Since we are removing this block don't need to write tombstones. + tombs = tombs[:te] fs.removeMsgBlock(mb) i-- // keep flag set, if set previously @@ -7856,14 +8040,15 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint // Out of order delete. mb.dmap.Insert(seq) } - - if maxp > 0 && purged >= maxp { + // Break if we have emptied this block or if we set a maximum purge count. + if mb.isEmpty() || (maxp > 0 && purged >= maxp) { break } } } - // Expire if we were responsible for loading. - if shouldExpire { + // Expire if we were responsible for loading and we do not seem to be doing successive purgeEx calls. + // On successive calls - most likely from KV purge deletes, we want to keep the data loaded. + if shouldExpire && time.Since(fs.lpex) > time.Second { // Expire this cache before moving on. mb.tryForceExpireCacheLocked() } @@ -7878,18 +8063,33 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint fs.selectNextFirst() } + // Update the last purgeEx call time. + defer func() { fs.lpex = time.Now() }() + // Write any tombstones as needed. - for _, tomb := range tombs { - fs.writeTombstone(tomb.seq, tomb.ts) + // When writing multiple tombstones we will flush at the end. + if len(tombs) > 0 { + for _, tomb := range tombs { + if err := fs.writeTombstoneNoFlush(tomb.seq, tomb.ts); err != nil { + return purged, err + } + } + if lmb := fs.lmb; lmb != nil { + lmb.flushPendingMsgs() + } } - os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() if cb != nil { - cb(-int64(purged), -int64(bytes), 0, _EMPTY_) + if purged == 1 { + cb(-int64(purged), -int64(bytes), lowSeq, subject) + } else { + // FIXME(dlc) - Since we track lowSeq we could send to upper layer if they dealt with the condition properly. + cb(-int64(purged), -int64(bytes), 0, _EMPTY_) + } } return purged, nil @@ -8064,7 +8264,7 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) { } } for mseq := atomic.LoadUint64(&smb.first.seq); mseq < seq; mseq++ { - sm, err := smb.cacheLookup(mseq, &smv) + sm, err := smb.cacheLookupNoCopy(mseq, &smv) if err == errDeletedMsg { // Update dmap. if !smb.dmap.IsEmpty() { @@ -8331,7 +8531,7 @@ func (fs *fileStore) Truncate(seq uint64) error { fs.mu.Unlock() return ErrInvalidSequence } - lsm, _, _ := nlmb.fetchMsg(seq, nil) + lsm, _, _ := nlmb.fetchMsgNoCopy(seq, nil) if lsm == nil { fs.mu.Unlock() return ErrInvalidSequence @@ -8722,7 +8922,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { // It gets set later on if the fss is non-empty anyway. continue } - sm, err := mb.cacheLookup(seq, &smv) + sm, err := mb.cacheLookupNoCopy(seq, &smv) if err != nil { // Since we are walking by sequence we can ignore some errors that are benign to rebuilding our state. if err == ErrStoreMsgNotFound || err == errDeletedMsg { @@ -8746,9 +8946,9 @@ func (mb *msgBlock) generatePerSubjectInfo() error { if mb.fss.Size() > 0 { // Make sure we run the cache expire timer. - mb.llts = time.Now().UnixNano() - // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.llts = getAccessTime() + // Mark fss activity same as load time. + mb.lsts = mb.llts mb.startCacheExpireTimer() } return nil @@ -8760,7 +8960,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error { if mb.fss != nil || mb.noTrack { if mb.fss != nil { // Mark fss activity. - mb.lsts = time.Now().UnixNano() + mb.lsts = getAccessTime() } return nil } @@ -10906,3 +11106,27 @@ func writeFileWithSync(name string, data []byte, perm fs.FileMode) error { } return f.Close() } + +// This is to offload UnixNano() processing from timestamp creation for cache management. +var ( + tsOnce sync.Once + accessTime atomic.Int64 +) + +// Update every 100ms. +const accessTimeTickInterval = 100 * time.Millisecond + +// Will load the access time from an atomic. We will also setup the Go routine +// to update this in one place. +func getAccessTime() int64 { + tsOnce.Do(func() { + accessTime.Store(time.Now().UnixNano()) + go func() { + ticker := time.NewTicker(accessTimeTickInterval) + for range ticker.C { + accessTime.Store(time.Now().UnixNano()) + } + }() + }) + return accessTime.Load() +} diff --git a/server/filestore_test.go b/server/filestore_test.go index 22a37d95c40..a08654c305c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -33,6 +33,7 @@ import ( "os" "path/filepath" "reflect" + "slices" "strconv" "strings" "sync" @@ -3666,6 +3667,15 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { _, _, err = fs.StoreMsg("foo.2", nil, []byte("xxxxxx"), 0) require_NoError(t, err) + // Make sure we have our state file prior to Purge call. + fs.forceWriteFullState() + + // Capture the current index.db file. + sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + // This should purge all "foo.1" p, err := fs.PurgeEx("foo.1", 1, 0) require_NoError(t, err) @@ -3698,6 +3708,79 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { if state := fs.State(); !reflect.DeepEqual(state, before) { t.Fatalf("Expected state of %+v, got %+v without index.db state", before, state) } + + // If we had an index.db from after PurgeEx but before Stop() would rewrite, make sure we + // properly can recover with the old index file. This would be a crash after the PurgeEx() call. + fs.Stop() + err = os.WriteFile(sfile, buf, defaultFilePerms) + require_NoError(t, err) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v with old index.db state", before, state) + } + }) +} + +func TestFileStorePurgeExNoTombsOnBlockRemoval(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 1000 + cfg := StreamConfig{Name: "TEST", Subjects: []string{"foo.>"}, Storage: FileStorage} + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + payload := make([]byte, 20) + + total := 100 + for i := 0; i < total; i++ { + _, _, err = fs.StoreMsg("foo.1", nil, payload, 0) + require_NoError(t, err) + } + _, _, err = fs.StoreMsg("foo.2", nil, payload, 0) + require_NoError(t, err) + + require_Equal(t, fs.numMsgBlocks(), 6) + + // Make sure we have our state file prior to Purge call. + fs.forceWriteFullState() + + // Capture the current index.db file if it exists. + sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile) + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + + // This should purge all "foo.1". This will remove the blocks so we want to make sure + // we do not write excessive tombstones here. + p, err := fs.PurgeEx("foo.1", 1, 0) + require_NoError(t, err) + require_Equal(t, p, uint64(total)) + + state := fs.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 101) + + // Check that we only have 1 msg block. + require_Equal(t, fs.numMsgBlocks(), 1) + + // Put the old index.db back. We want to make sure without the empty block tombstones that we + // properly recover state. + fs.Stop() + err = os.WriteFile(sfile, buf, defaultFilePerms) + require_NoError(t, err) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + state = fs.State() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.FirstSeq, 101) }) } @@ -4941,7 +5024,7 @@ func TestFileStoreSkipMsgAndNumBlocks(t *testing.T) { fs.SkipMsg() } fs.StoreMsg(subj, nil, msg, 0) - require_True(t, fs.numMsgBlocks() == 2) + require_Equal(t, fs.numMsgBlocks(), 3) } func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) { @@ -6457,7 +6540,7 @@ func TestFileStoreFSSMeta(t *testing.T) { fs.StoreMsg("A", nil, msg, 0) // Let cache's expire before PurgeEx which will load them back in. - time.Sleep(250 * time.Millisecond) + time.Sleep(500 * time.Millisecond) p, err := fs.PurgeEx("A", 1, 0) require_NoError(t, err) @@ -6508,7 +6591,7 @@ func TestFileStoreExpireCacheOnLinearWalk(t *testing.T) { } // Let them all expire. This way we load as we walk and can test that we expire all blocks without // needing to worry about last write times blocking forced expiration. - time.Sleep(expire) + time.Sleep(expire + accessTimeTickInterval) checkNoCache := func() { t.Helper() @@ -6731,7 +6814,7 @@ func TestFileStoreEraseMsgWithAllTrailingDbitSlots(t *testing.T) { func TestFileStoreMultiLastSeqs(t *testing.T) { fs, err := newFileStore( FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 256}, // Make block size small to test multiblock selections with maxSeq - StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + StreamConfig{Name: "zzz", Subjects: []string{"foo.*", "bar.*"}, Storage: FileStorage}) require_NoError(t, err) defer fs.Stop() @@ -9443,3 +9526,32 @@ func TestFileStoreRemoveMsgBlockLast(t *testing.T) { _, err = os.Stat(ofn) require_True(t, os.IsNotExist(err)) } + +func TestFileStoreAllLastSeqs(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, // Make block size small to test multiblock selections with maxSeq + StreamConfig{Name: "zzz", Subjects: []string{"*.*"}, MaxMsgsPer: 50, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + subjs := []string{"foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz"} + msg := []byte("abc") + + for i := 0; i < 100_000; i++ { + subj := subjs[rand.Intn(len(subjs))] + fs.StoreMsg(subj, nil, msg, 0) + } + + expected := make([]uint64, 0, len(subjs)) + var smv StoreMsg + for _, subj := range subjs { + sm, err := fs.LoadLastMsg(subj, &smv) + require_NoError(t, err) + expected = append(expected, sm.seq) + } + slices.Sort(expected) + + seqs, err := fs.AllLastSeqs() + require_NoError(t, err) + require_True(t, reflect.DeepEqual(seqs, expected)) +} diff --git a/server/memstore.go b/server/memstore.go index 884cb139280..708485981b1 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -637,6 +637,44 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { return fss } +// AllLastSeqs will return a sorted list of last sequences for all subjects. +func (ms *memStore) AllLastSeqs() ([]uint64, error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + + if len(ms.msgs) == 0 { + return nil, nil + } + + seqs := make([]uint64, 0, ms.fss.Size()) + ms.fss.IterFast(func(subj []byte, ss *SimpleState) bool { + seqs = append(seqs, ss.Last) + return true + }) + + slices.Sort(seqs) + return seqs, nil +} + +// Helper to determine if the filter(s) represent all the subjects. +// Most clients send in subjects even if they match the stream's ingest subjects. +// Lock should be held. +func (ms *memStore) filterIsAll(filters []string) bool { + if len(filters) != len(ms.cfg.Subjects) { + return false + } + // Sort so we can compare. + slices.Sort(filters) + for i, subj := range filters { + if !subjectIsSubsetMatch(ms.cfg.Subjects[i], subj) { + return false + } + } + return true +} + +// MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters. +// We will not exceed the maxSeq, which if 0 becomes the store's last sequence. func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -645,12 +683,16 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in return nil, nil } + // See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set. + if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) { + return ms.AllLastSeqs() + } + // Implied last sequence. if maxSeq == 0 { maxSeq = ms.state.LastSeq } - //subs := make(map[string]*SimpleState) seqs := make([]uint64, 0, 64) seen := make(map[uint64]struct{}) diff --git a/server/memstore_test.go b/server/memstore_test.go index 1fa04165fc1..77ab3a2c22c 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "reflect" + "slices" "testing" "time" @@ -920,7 +921,7 @@ func TestMemStoreSkipMsgs(t *testing.T) { func TestMemStoreMultiLastSeqs(t *testing.T) { cfg := &StreamConfig{ Name: "zzz", - Subjects: []string{"foo.*"}, + Subjects: []string{"foo.*", "bar.*"}, Storage: MemoryStorage, } ms, err := newMemStore(cfg) @@ -1258,6 +1259,39 @@ func TestMemStoreSubjectDeleteMarkers(t *testing.T) { require_Equal(t, bytesToString(getHeader(JSMessageTTL, im.hdr)), "1s") } +func TestMemStoreAllLastSeqs(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"*.*"}, + MaxMsgsPer: 50, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + subjs := []string{"foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz"} + msg := []byte("abc") + + for i := 0; i < 100_000; i++ { + subj := subjs[rand.Intn(len(subjs))] + ms.StoreMsg(subj, nil, msg, 0) + } + + expected := make([]uint64, 0, len(subjs)) + var smv StoreMsg + for _, subj := range subjs { + sm, err := ms.LoadLastMsg(subj, &smv) + require_NoError(t, err) + expected = append(expected, sm.seq) + } + slices.Sort(expected) + + seqs, err := ms.AllLastSeqs() + require_NoError(t, err) + require_True(t, reflect.DeepEqual(seqs, expected)) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/pse/pse_freebsd_sysctl.go b/server/pse/pse_freebsd_sysctl.go index 09010417ee0..f4ea1993409 100644 --- a/server/pse/pse_freebsd_sysctl.go +++ b/server/pse/pse_freebsd_sysctl.go @@ -1,4 +1,4 @@ -// Copyright 2015-2020 The NATS Authors +// Copyright 2015-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -22,7 +22,7 @@ // We've switched the other implementation to include '_cgo' in the filename, // to show that it's not the default. This isn't an os or arch build tag, // so we have to use explicit build-tags within. -// If lacking CGO support and targetting an unsupported arch, then before the +// If lacking CGO support and targeting an unsupported arch, then before the // change you would have a compile failure for not being able to cross-compile. // After the change, you have a compile failure for not having the symbols // because no source file satisfies them. diff --git a/server/store.go b/server/store.go index ec7c6e8e4a4..308d0b08ee2 100644 --- a/server/store.go +++ b/server/store.go @@ -110,6 +110,7 @@ type StreamStore interface { FilteredState(seq uint64, subject string) SimpleState SubjectsState(filterSubject string) map[string]SimpleState SubjectsTotals(filterSubject string) map[string]uint64 + AllLastSeqs() ([]uint64, error) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) diff --git a/server/stream.go b/server/stream.go index d46535037e1..a8f9d26fbf3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2279,7 +2279,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err // or consumer filter subject is subset of purged subject, // but not the other way around. o.isEqualOrSubsetMatch(preq.Subject) - // Check if a consumer has a wider subject space then what we purged + // Check if a consumer has a wider subject space than what we purged var isWider bool if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) { doPurge, isWider = true, true