@@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
2315
2315
fseq = lseq + 1
2316
2316
for _ , subj := range subs {
2317
2317
ss , _ := mb .fss .Find (stringToBytes (subj ))
2318
- if ss != nil && ss .firstNeedsUpdate {
2319
- mb .recalculateFirstForSubj (subj , ss . First , ss )
2318
+ if ss != nil && ( ss .firstNeedsUpdate || ss . lastNeedsUpdate ) {
2319
+ mb .recalculateForSubj (subj , ss )
2320
2320
}
2321
2321
if ss == nil || start > ss .Last || ss .First >= fseq {
2322
2322
continue
@@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
2445
2445
// If we already found a partial then don't do anything else.
2446
2446
return
2447
2447
}
2448
- if ss .firstNeedsUpdate {
2449
- mb .recalculateFirstForSubj (bytesToString (bsubj ), ss . First , ss )
2448
+ if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
2449
+ mb .recalculateForSubj (bytesToString (bsubj ), ss )
2450
2450
}
2451
2451
if sseq <= ss .First {
2452
2452
update (ss )
@@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
2745
2745
mb .lsts = time .Now ().UnixNano ()
2746
2746
mb .fss .Match (stringToBytes (subject ), func (bsubj []byte , ss * SimpleState ) {
2747
2747
subj := string (bsubj )
2748
- if ss .firstNeedsUpdate {
2749
- mb .recalculateFirstForSubj (subj , ss . First , ss )
2748
+ if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
2749
+ mb .recalculateForSubj (subj , ss )
2750
2750
}
2751
2751
oss := fss [subj ]
2752
2752
if oss .First == 0 { // New
@@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
2936
2936
return
2937
2937
}
2938
2938
subj := bytesToString (bsubj )
2939
- if ss .firstNeedsUpdate {
2940
- mb .recalculateFirstForSubj (subj , ss . First , ss )
2939
+ if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
2940
+ mb .recalculateForSubj (subj , ss )
2941
2941
}
2942
2942
if sseq <= ss .First {
2943
2943
t += ss .Msgs
@@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
3224
3224
// If we already found a partial then don't do anything else.
3225
3225
return
3226
3226
}
3227
- if ss .firstNeedsUpdate {
3228
- mb .recalculateFirstForSubj (subj , ss . First , ss )
3227
+ if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
3228
+ mb .recalculateForSubj (subj , ss )
3229
3229
}
3230
3230
if sseq <= ss .First {
3231
3231
t += ss .Msgs
@@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
3898
3898
info .fblk = i
3899
3899
}
3900
3900
}
3901
- if ss .firstNeedsUpdate {
3902
- mb .recalculateFirstForSubj (subj , ss . First , ss )
3901
+ if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
3902
+ mb .recalculateForSubj (subj , ss )
3903
3903
}
3904
3904
mb .mu .Unlock ()
3905
3905
// Re-acquire fs lock
@@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
4030
4030
mb .mu .Lock ()
4031
4031
mb .ensurePerSubjectInfoLoaded ()
4032
4032
ss , ok := mb .fss .Find (stringToBytes (subj ))
4033
- if ok && ss != nil && ss .firstNeedsUpdate {
4034
- mb .recalculateFirstForSubj (subj , ss . First , ss )
4033
+ if ok && ss != nil && ( ss .firstNeedsUpdate || ss . lastNeedsUpdate ) {
4034
+ mb .recalculateForSubj (subj , ss )
4035
4035
}
4036
4036
mb .mu .Unlock ()
4037
4037
if ss == nil {
@@ -7387,9 +7387,6 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
7387
7387
// Update fss
7388
7388
smb .removeSeqPerSubject (sm .subj , mseq )
7389
7389
fs .removePerSubject (sm .subj )
7390
- // Need to mark the sequence as deleted. Otherwise, recalculating ss.First
7391
- // for per-subject info would be able to find it still.
7392
- smb .dmap .Insert (mseq )
7393
7390
}
7394
7391
}
7395
7392
@@ -7835,76 +7832,115 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
7835
7832
7836
7833
ss .Msgs --
7837
7834
7838
- // Only one left.
7839
- if ss .Msgs == 1 {
7840
- // Update first if we need to, we must check if this removal is about what's going to be ss.First
7841
- if ss .firstNeedsUpdate {
7842
- mb .recalculateFirstForSubj (subj , ss .First , ss )
7843
- }
7844
- // If we're removing the first message, we must recalculate again.
7845
- // ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
7846
- if ss .First == seq {
7847
- mb .recalculateFirstForSubj (subj , ss .First , ss )
7848
- }
7849
- ss .Last = ss .First
7850
- ss .firstNeedsUpdate = false
7851
- return
7852
- }
7853
-
7854
- // We can lazily calculate the first sequence when needed.
7835
+ // We can lazily calculate the first/last sequence when needed.
7855
7836
ss .firstNeedsUpdate = seq == ss .First || ss .firstNeedsUpdate
7837
+ ss .lastNeedsUpdate = seq == ss .Last || ss .lastNeedsUpdate
7856
7838
}
7857
7839
7858
- // Will recalulate the first sequence for this subject in this block.
7840
+ // Will recalculate the first and/or last sequence for this subject in this block.
7859
7841
// Will avoid slower path message lookups and scan the cache directly instead.
7860
- func (mb * msgBlock ) recalculateFirstForSubj (subj string , startSeq uint64 , ss * SimpleState ) {
7842
+ func (mb * msgBlock ) recalculateForSubj (subj string , ss * SimpleState ) {
7861
7843
// Need to make sure messages are loaded.
7862
7844
if mb .cacheNotLoaded () {
7863
7845
if err := mb .loadMsgsWithLock (); err != nil {
7864
7846
return
7865
7847
}
7866
7848
}
7867
7849
7868
- // Mark first as updated.
7869
- ss . firstNeedsUpdate = false
7870
-
7871
- startSlot := int ( startSeq - mb . cache . fseq )
7850
+ startSlot := int ( ss . First - mb . cache . fseq )
7851
+ if startSlot < 0 {
7852
+ startSlot = 0
7853
+ }
7872
7854
if startSlot >= len (mb .cache .idx ) {
7873
7855
ss .First = ss .Last
7874
7856
return
7875
- } else if startSlot < 0 {
7876
- startSlot = 0
7877
7857
}
7878
-
7879
- fseq := startSeq + 1
7880
- if mbFseq := atomic .LoadUint64 (& mb .first .seq ); fseq < mbFseq {
7881
- fseq = mbFseq
7858
+ endSlot := int (ss .Last - mb .cache .fseq )
7859
+ if endSlot < 0 {
7860
+ endSlot = 0
7861
+ }
7862
+ if endSlot >= len (mb .cache .idx ) || startSlot > endSlot {
7863
+ return
7882
7864
}
7865
+
7883
7866
var le = binary .LittleEndian
7884
- for slot := startSlot ; slot < len (mb .cache .idx ); slot ++ {
7885
- bi := mb .cache .idx [slot ] &^ hbit
7886
- if bi == dbit {
7887
- // delete marker so skip.
7888
- continue
7867
+ if ss .firstNeedsUpdate {
7868
+ // Mark first as updated.
7869
+ ss .firstNeedsUpdate = false
7870
+
7871
+ fseq := ss .First + 1
7872
+ if mbFseq := atomic .LoadUint64 (& mb .first .seq ); fseq < mbFseq {
7873
+ fseq = mbFseq
7889
7874
}
7890
- li := int (bi ) - mb .cache .off
7891
- if li >= len (mb .cache .buf ) {
7892
- ss .First = ss .Last
7893
- return
7875
+ for slot := startSlot ; slot < len (mb .cache .idx ); slot ++ {
7876
+ bi := mb .cache .idx [slot ] &^ hbit
7877
+ if bi == dbit {
7878
+ // delete marker so skip.
7879
+ continue
7880
+ }
7881
+ li := int (bi ) - mb .cache .off
7882
+ if li >= len (mb .cache .buf ) {
7883
+ ss .First = ss .Last
7884
+ return
7885
+ }
7886
+ buf := mb .cache .buf [li :]
7887
+ hdr := buf [:msgHdrSize ]
7888
+ slen := int (le .Uint16 (hdr [20 :]))
7889
+ if subj == bytesToString (buf [msgHdrSize :msgHdrSize + slen ]) {
7890
+ seq := le .Uint64 (hdr [4 :])
7891
+ if seq < fseq || seq & ebit != 0 || mb .dmap .Exists (seq ) {
7892
+ continue
7893
+ }
7894
+ ss .First = seq
7895
+ if ss .Msgs == 1 {
7896
+ ss .Last = seq
7897
+ ss .lastNeedsUpdate = false
7898
+ return
7899
+ }
7900
+ // Skip the start slot ahead, if we need to recalculate last we can stop early.
7901
+ startSlot = slot
7902
+ break
7903
+ }
7904
+ }
7905
+ }
7906
+ if ss .lastNeedsUpdate {
7907
+ // Mark last as updated.
7908
+ ss .lastNeedsUpdate = false
7909
+
7910
+ lseq := ss .Last - 1
7911
+ if mbLseq := atomic .LoadUint64 (& mb .last .seq ); lseq > mbLseq {
7912
+ lseq = mbLseq
7894
7913
}
7895
- buf := mb .cache .buf [li :]
7896
- hdr := buf [:msgHdrSize ]
7897
- slen := int (le .Uint16 (hdr [20 :]))
7898
- if subj == bytesToString (buf [msgHdrSize :msgHdrSize + slen ]) {
7899
- seq := le .Uint64 (hdr [4 :])
7900
- if seq < fseq || seq & ebit != 0 || mb .dmap .Exists (seq ) {
7914
+ for slot := endSlot ; slot >= startSlot ; slot -- {
7915
+ bi := mb .cache .idx [slot ] &^ hbit
7916
+ if bi == dbit {
7917
+ // delete marker so skip.
7901
7918
continue
7902
7919
}
7903
- ss .First = seq
7904
- if ss .Msgs == 1 {
7920
+ li := int (bi ) - mb .cache .off
7921
+ if li >= len (mb .cache .buf ) {
7922
+ // Can't overwrite ss.Last, just skip.
7923
+ return
7924
+ }
7925
+ buf := mb .cache .buf [li :]
7926
+ hdr := buf [:msgHdrSize ]
7927
+ slen := int (le .Uint16 (hdr [20 :]))
7928
+ if subj == bytesToString (buf [msgHdrSize :msgHdrSize + slen ]) {
7929
+ seq := le .Uint64 (hdr [4 :])
7930
+ if seq > lseq || seq & ebit != 0 || mb .dmap .Exists (seq ) {
7931
+ continue
7932
+ }
7933
+ // Sequence should never be lower, but guard against it nonetheless.
7934
+ if seq < ss .First {
7935
+ seq = ss .First
7936
+ }
7905
7937
ss .Last = seq
7938
+ if ss .Msgs == 1 {
7939
+ ss .First = seq
7940
+ ss .firstNeedsUpdate = false
7941
+ }
7942
+ return
7906
7943
}
7907
- return
7908
7944
}
7909
7945
}
7910
7946
}
0 commit comments