Skip to content

Commit 3c15bd9

Browse files
Cherry-picks for 2.11.4-RC.2 (#6910)
Includes the following: - #6895 - #6899 - #6900 - #6908 Signed-off-by: Neil Twigg <[email protected]>
2 parents 99fe8db + 4ad6bed commit 3c15bd9

File tree

6 files changed

+55
-54
lines changed

6 files changed

+55
-54
lines changed

server/events_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2344,6 +2344,10 @@ func TestServerEventsHealthZClustered(t *testing.T) {
23442344
t.Fatalf("Error creating consumer: %v", err)
23452345
}
23462346

2347+
c.waitOnStreamLeader("ONE", "test")
2348+
c.waitOnConsumerLeader("ONE", "test", "cons")
2349+
c.waitOnAllCurrent()
2350+
23472351
subj := fmt.Sprintf(serverHealthzReqSubj, c.servers[0].ID())
23482352
pingSubj := fmt.Sprintf(serverHealthzReqSubj, "PING")
23492353

server/filestore.go

Lines changed: 29 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2486,7 +2486,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
24862486
mb.mu.Unlock()
24872487
}()
24882488

2489-
fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter}
2489+
fseq, isAll := start, filter == _EMPTY_ || filter == fwcs
24902490

24912491
var didLoad bool
24922492
if mb.fssNotLoaded() {
@@ -2514,18 +2514,15 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
25142514
}
25152515
}
25162516
// Make sure to start at mb.first.seq if fseq < mb.first.seq
2517-
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
2518-
fseq = seq
2519-
}
2517+
fseq = max(fseq, atomic.LoadUint64(&mb.first.seq))
25202518
lseq := atomic.LoadUint64(&mb.last.seq)
25212519

25222520
// Optionally build the isMatch for wildcard filters.
2523-
_tsa, _fsa := [32]string{}, [32]string{}
2524-
tsa, fsa := _tsa[:0], _fsa[:0]
25252521
var isMatch func(subj string) bool
25262522
// Decide to build.
25272523
if wc {
2528-
fsa = tokenizeSubjectIntoSlice(fsa[:0], filter)
2524+
_tsa, _fsa := [32]string{}, [32]string{}
2525+
tsa, fsa := _tsa[:0], tokenizeSubjectIntoSlice(_fsa[:0], filter)
25292526
isMatch = func(subj string) bool {
25302527
tsa = tokenizeSubjectIntoSlice(tsa[:0], subj)
25312528
return isSubsetMatchTokenized(tsa, fsa)
@@ -2545,29 +2542,22 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
25452542

25462543
if !doLinearScan {
25472544
// If we have a wildcard match against all tracked subjects we know about.
2548-
if wc {
2549-
subs = subs[:0]
2550-
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, _ *SimpleState) {
2551-
subs = append(subs, string(bsubj))
2552-
})
2553-
// Check if we matched anything
2554-
if len(subs) == 0 {
2555-
return nil, didLoad, ErrStoreMsgNotFound
2556-
}
2557-
}
25582545
fseq = lseq + 1
2559-
for _, subj := range subs {
2560-
ss, _ := mb.fss.Find(stringToBytes(subj))
2561-
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
2562-
mb.recalculateForSubj(subj, ss)
2563-
}
2564-
if ss == nil || start > ss.Last || ss.First >= fseq {
2565-
continue
2546+
if bfilter := stringToBytes(filter); wc {
2547+
mb.fss.Match(bfilter, func(bsubj []byte, ss *SimpleState) {
2548+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
2549+
mb.recalculateForSubj(bytesToString(bsubj), ss)
2550+
}
2551+
if start <= ss.Last {
2552+
fseq = min(fseq, max(start, ss.First))
2553+
}
2554+
})
2555+
} else if ss, _ := mb.fss.Find(bfilter); ss != nil {
2556+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
2557+
mb.recalculateForSubj(filter, ss)
25662558
}
2567-
if ss.First < start {
2568-
fseq = start
2569-
} else {
2570-
fseq = ss.First
2559+
if start <= ss.Last {
2560+
fseq = min(fseq, max(start, ss.First))
25712561
}
25722562
}
25732563
}
@@ -2576,13 +2566,6 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
25762566
return nil, didLoad, ErrStoreMsgNotFound
25772567
}
25782568

2579-
// If we guess to not do a linear scan, but the above resulted in alot of subs that will
2580-
// need to be checked for every scanned message, revert.
2581-
// TODO(dlc) - we could memoize the subs across calls.
2582-
if !doLinearScan && len(subs) > int(lseq-fseq) {
2583-
doLinearScan = true
2584-
}
2585-
25862569
// Need messages loaded from here on out.
25872570
if mb.cacheNotLoaded() {
25882571
if err := mb.loadMsgsWithLock(); err != nil {
@@ -2615,18 +2598,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
26152598
if isAll {
26162599
return fsm, expireOk, nil
26172600
}
2618-
if doLinearScan {
2619-
if wc && isMatch(sm.subj) {
2620-
return fsm, expireOk, nil
2621-
} else if !wc && fsm.subj == filter {
2622-
return fsm, expireOk, nil
2623-
}
2624-
} else {
2625-
for _, subj := range subs {
2626-
if fsm.subj == subj {
2627-
return fsm, expireOk, nil
2628-
}
2629-
}
2601+
if wc && isMatch(sm.subj) {
2602+
return fsm, expireOk, nil
2603+
} else if !wc && fsm.subj == filter {
2604+
return fsm, expireOk, nil
26302605
}
26312606
// If we are here we did not match, so put the llseq back.
26322607
mb.llseq = llseq
@@ -3042,7 +3017,13 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
30423017
func (fs *fileStore) AllLastSeqs() ([]uint64, error) {
30433018
fs.mu.RLock()
30443019
defer fs.mu.RUnlock()
3020+
return fs.allLastSeqsLocked()
3021+
}
30453022

3023+
// allLastSeqsLocked will return a sorted list of last sequences for all
3024+
// subjects, but won't take the lock to do it, to avoid the issue of compounding
3025+
// read locks causing a deadlock with a write lock.
3026+
func (fs *fileStore) allLastSeqsLocked() ([]uint64, error) {
30463027
if fs.state.Msgs == 0 || fs.noTrackSubjects() {
30473028
return nil, nil
30483029
}
@@ -3113,7 +3094,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i
31133094

31143095
// See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set.
31153096
if maxSeq == 0 && maxAllowed <= 0 && fs.filterIsAll(filters) {
3116-
return fs.AllLastSeqs()
3097+
return fs.allLastSeqsLocked()
31173098
}
31183099

31193100
lastBlkIndex := len(fs.blks) - 1

server/leafnode_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9965,7 +9965,13 @@ func TestLeafNodePermissionWithLiteralSubjectAndQueueInterest(t *testing.T) {
99659965
ncHub := natsConnect(t, hub.ClientURL(), nats.UserInfo("user", "pwd"))
99669966
defer ncHub.Close()
99679967

9968-
resp, err := ncHub.Request("my.subject", []byte("hello"), time.Second)
9969-
require_NoError(t, err)
9968+
var resp *nats.Msg
9969+
var err error
9970+
checkFor(t, 5*time.Second, time.Second, func() error {
9971+
// Make sure we don't fail the test on the first "no responders", might
9972+
// take time for the sub to propagate.
9973+
resp, err = ncHub.Request("my.subject", []byte("hello"), time.Second)
9974+
return err
9975+
})
99709976
require_Equal(t, "OK", string(resp.Data))
99719977
}

server/memstore.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,13 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
641641
func (ms *memStore) AllLastSeqs() ([]uint64, error) {
642642
ms.mu.RLock()
643643
defer ms.mu.RUnlock()
644+
return ms.allLastSeqsLocked()
645+
}
644646

647+
// allLastSeqsLocked will return a sorted list of last sequences for all
648+
// subjects, but won't take the lock to do it, to avoid the issue of compounding
649+
// read locks causing a deadlock with a write lock.
650+
func (ms *memStore) allLastSeqsLocked() ([]uint64, error) {
645651
if len(ms.msgs) == 0 {
646652
return nil, nil
647653
}
@@ -685,7 +691,7 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in
685691

686692
// See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set.
687693
if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) {
688-
return ms.AllLastSeqs()
694+
return ms.allLastSeqsLocked()
689695
}
690696

691697
// Implied last sequence.

server/norace_2_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3421,6 +3421,10 @@ func TestNoRaceAccessTimeLeakCheck(t *testing.T) {
34213421
s.Shutdown()
34223422
s.WaitForShutdown()
34233423

3424-
ngra := runtime.NumGoroutine()
3425-
require_Equal(t, ngrp, ngra)
3424+
checkFor(t, 5*time.Second, time.Second, func() error {
3425+
if ngra := runtime.NumGoroutine(); ngrp != ngra {
3426+
return fmt.Errorf("expected %d, got %d", ngrp, ngra)
3427+
}
3428+
return nil
3429+
})
34263430
}

server/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var (
5555
// while a snapshot is in progress.
5656
ErrStoreSnapshotInProgress = errors.New("snapshot in progress")
5757
// ErrMsgTooLarge is returned when a message is considered too large.
58-
ErrMsgTooLarge = errors.New("message to large")
58+
ErrMsgTooLarge = errors.New("message too large")
5959
// ErrStoreWrongType is for when you access the wrong storage type.
6060
ErrStoreWrongType = errors.New("wrong storage type")
6161
// ErrNoAckPolicy is returned when trying to update a consumer's acks with no ack policy.

0 commit comments

Comments
 (0)