Skip to content

Commit 1a964fa

Browse files
[IMPROVED] Optimizations for PurgeEx from KV PurgeDeletes() clients. (#6801)
Various optimizations and introduction of AllLastSeqs(). Filestore specific items. 1. Internal calls to fetch a msg now use a NoCopy version to avoid unnecessary allocations. 2. We introduced a last purge time to detect repeated calls to Purge from a client KV PurgeDeletes operation. 3. Access times for cache lifetime management are now from a single ticker updating an atomic every 100ms. 4. filteredPendingLocked now detects non-wildcard filters. 5. AllLastSeqs() is an optimized version to return all last sequences for all subjects. Helpful for KV watchers. 6. Optimized MultiLastSeqs() and allow it to call into AllLastSeqs() when appropriate. 7. Allow multiple tombstones to be written async and flushed all at once. Signed-off-by: Derek Collison <[email protected]>
2 parents b677274 + 852bfc7 commit 1a964fa

File tree

9 files changed

+614
-215
lines changed

9 files changed

+614
-215
lines changed

server/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4329,7 +4329,7 @@ func sliceHeader(key string, hdr []byte) []byte {
43294329
if len(hdr) == 0 {
43304330
return nil
43314331
}
4332-
index := bytes.Index(hdr, []byte(key))
4332+
index := bytes.Index(hdr, stringToBytes(key))
43334333
hdrLen := len(hdr)
43344334
// Check that we have enough characters, this will handle the -1 case of the key not
43354335
// being found and will also handle not having enough characters for trailing CRLF.

server/consumer.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5322,9 +5322,6 @@ func (o *consumer) selectStartingSeqNo() {
53225322
if mmp == 1 {
53235323
o.sseq = state.FirstSeq
53245324
} else {
5325-
// A threshold for when we switch from get last msg to subjects state.
5326-
const numSubjectsThresh = 256
5327-
lss := &lastSeqSkipList{resume: state.LastSeq}
53285325
var filters []string
53295326
if o.subjf == nil {
53305327
filters = append(filters, o.cfg.FilterSubject)
@@ -5333,24 +5330,10 @@ func (o *consumer) selectStartingSeqNo() {
53335330
filters = append(filters, filter.subject)
53345331
}
53355332
}
5336-
for _, filter := range filters {
5337-
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
5338-
var smv StoreMsg
5339-
for subj := range st {
5340-
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
5341-
lss.seqs = append(lss.seqs, sm.seq)
5342-
}
5343-
}
5344-
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
5345-
for _, ss := range mss {
5346-
lss.seqs = append(lss.seqs, ss.Last)
5347-
}
5348-
}
5349-
}
5350-
// Sort the skip list if needed.
5351-
if len(lss.seqs) > 1 {
5352-
slices.Sort(lss.seqs)
5353-
}
5333+
5334+
lss := &lastSeqSkipList{resume: state.LastSeq}
5335+
lss.seqs, _ = o.mset.store.MultiLastSeqs(filters, 0, 0)
5336+
53545337
if len(lss.seqs) == 0 {
53555338
o.sseq = state.LastSeq
53565339
} else {
@@ -5874,7 +5857,10 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
58745857

58755858
// Check if this message was pending.
58765859
p, wasPending := o.pending[sseq]
5877-
rdc := o.deliveryCount(sseq)
5860+
var rdc uint64
5861+
if wasPending {
5862+
rdc = o.deliveryCount(sseq)
5863+
}
58785864

58795865
o.mu.Unlock()
58805866

0 commit comments

Comments
 (0)