Skip to content

Commit bcfafd3

Browse files
Ensure deleteNotActive stops on JetStream or server shutdown (#6351)
This ensures that we stop these goroutines more quickly when either the server or the JetStream system shuts down. This also avoids a race in `TestJetStreamClusterGhostEphemeralsAfterRestart`, as the unit test was reverting a modified constant before these goroutines would have finished in some cases. Signed-off-by: Neil Twigg <[email protected]>
2 parents 4e40b92 + 7828bd4 commit bcfafd3

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

server/consumer.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1853,6 +1853,13 @@ func (o *consumer) deleteNotActive() {
18531853

18541854
s, js := o.mset.srv, o.srv.js.Load()
18551855
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
1856+
var qch, cqch chan struct{}
1857+
if o.srv != nil {
1858+
qch = o.srv.quitCh
1859+
}
1860+
if o.js != nil {
1861+
cqch = o.js.clusterQuitC()
1862+
}
18561863
o.mu.Unlock()
18571864

18581865
// Useful for pprof.
@@ -1891,7 +1898,14 @@ func (o *consumer) deleteNotActive() {
18911898
interval := consumerNotActiveStartInterval + jitter
18921899
ticker := time.NewTicker(interval)
18931900
defer ticker.Stop()
1894-
for range ticker.C {
1901+
for {
1902+
select {
1903+
case <-ticker.C:
1904+
case <-qch:
1905+
return
1906+
case <-cqch:
1907+
return
1908+
}
18951909
js.mu.RLock()
18961910
if js.shuttingDown {
18971911
js.mu.RUnlock()

0 commit comments

Comments
 (0)