Skip to content

Commit 48c3c47

Browse files
committed
Only encode & send advisories when there is interest
Signed-off-by: Neil Twigg <[email protected]>
1 parent 519943f commit 48c3c47

File tree

3 files changed

+69
-60
lines changed

3 files changed

+69
-60
lines changed

server/consumer.go

Lines changed: 26 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1588,8 +1588,23 @@ func (o *consumer) unsubscribe(sub *subscription) {
15881588

15891589
// We need to make sure we protect access to the outq.
15901590
// Do all advisory sends here.
1591-
func (o *consumer) sendAdvisory(subj string, msg []byte) {
1592-
o.outq.sendMsg(subj, msg)
1591+
func (o *consumer) sendAdvisory(subject string, e any) {
1592+
if o.acc == nil {
1593+
return
1594+
}
1595+
1596+
// If there is no one listening for this advisory then save ourselves the effort
1597+
// and don't bother encoding the JSON or sending it.
1598+
if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) {
1599+
return
1600+
}
1601+
1602+
j, err := json.Marshal(e)
1603+
if err != nil {
1604+
return
1605+
}
1606+
1607+
o.outq.sendMsg(subject, j)
15931608
}
15941609

15951610
func (o *consumer) sendDeleteAdvisoryLocked() {
@@ -1605,13 +1620,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
16051620
Domain: o.srv.getOpts().JetStreamDomain,
16061621
}
16071622

1608-
j, err := json.Marshal(e)
1609-
if err != nil {
1610-
return
1611-
}
1612-
16131623
subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
1614-
o.sendAdvisory(subj, j)
1624+
o.sendAdvisory(subj, e)
16151625
}
16161626

16171627
func (o *consumer) sendPinnedAdvisoryLocked(group string) {
@@ -1629,13 +1639,8 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) {
16291639
Group: group,
16301640
}
16311641

1632-
j, err := json.Marshal(e)
1633-
if err != nil {
1634-
return
1635-
}
1636-
16371642
subj := JSAdvisoryConsumerPinnedPre + "." + o.stream + "." + o.name
1638-
o.sendAdvisory(subj, j)
1643+
o.sendAdvisory(subj, e)
16391644

16401645
}
16411646
func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
@@ -1653,13 +1658,8 @@ func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
16531658
Reason: reason,
16541659
}
16551660

1656-
j, err := json.Marshal(e)
1657-
if err != nil {
1658-
return
1659-
}
1660-
16611661
subj := JSAdvisoryConsumerUnpinnedPre + "." + o.stream + "." + o.name
1662-
o.sendAdvisory(subj, j)
1662+
o.sendAdvisory(subj, e)
16631663

16641664
}
16651665

@@ -1679,13 +1679,8 @@ func (o *consumer) sendCreateAdvisory() {
16791679
Domain: o.srv.getOpts().JetStreamDomain,
16801680
}
16811681

1682-
j, err := json.Marshal(e)
1683-
if err != nil {
1684-
return
1685-
}
1686-
16871682
subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
1688-
o.sendAdvisory(subj, j)
1683+
o.sendAdvisory(subj, e)
16891684
}
16901685

16911686
func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
@@ -1705,13 +1700,8 @@ func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
17051700
e.Paused = time.Now().Before(e.PauseUntil)
17061701
}
17071702

1708-
j, err := json.Marshal(e)
1709-
if err != nil {
1710-
return
1711-
}
1712-
17131703
subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
1714-
o.sendAdvisory(subj, j)
1704+
o.sendAdvisory(subj, e)
17151705
}
17161706

17171707
// Created returns created time.
@@ -2652,12 +2642,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
26522642
Domain: o.srv.getOpts().JetStreamDomain,
26532643
}
26542644

2655-
j, err := json.Marshal(e)
2656-
if err != nil {
2657-
return
2658-
}
2659-
2660-
o.sendAdvisory(o.nakEventT, j)
2645+
o.sendAdvisory(o.nakEventT, e)
26612646

26622647
// Check to see if we have delays attached.
26632648
if len(nak) > len(AckNak) {
@@ -2732,15 +2717,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool
27322717
Domain: o.srv.getOpts().JetStreamDomain,
27332718
}
27342719

2735-
j, err := json.Marshal(e)
2736-
if err != nil {
2737-
// We had an error during the marshal, so we can't send the advisory,
2738-
// but we still need to tell the caller that the ack was processed.
2739-
return ackedInPlace
2740-
}
2741-
27422720
subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
2743-
o.sendAdvisory(subj, j)
2721+
o.sendAdvisory(subj, e)
27442722
return ackedInPlace
27452723
}
27462724

@@ -3052,12 +3030,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
30523030
Domain: o.srv.getOpts().JetStreamDomain,
30533031
}
30543032

3055-
j, err := json.Marshal(e)
3056-
if err != nil {
3057-
return
3058-
}
3059-
3060-
o.sendAdvisory(o.ackEventT, j)
3033+
o.sendAdvisory(o.ackEventT, e)
30613034
}
30623035

30633036
// Process an ACK.
@@ -3946,12 +3919,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
39463919
Domain: o.srv.getOpts().JetStreamDomain,
39473920
}
39483921

3949-
j, err := json.Marshal(e)
3950-
if err != nil {
3951-
return
3952-
}
3953-
3954-
o.sendAdvisory(o.deliveryExcEventT, j)
3922+
o.sendAdvisory(o.deliveryExcEventT, e)
39553923
}
39563924

39573925
// Check if the candidate subject matches a filter if its present.

server/jetstream_cluster_4_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5229,3 +5229,34 @@ func TestJetStreamClusterMetaStepdownPreferred(t *testing.T) {
52295229
require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF)
52305230
})
52315231
}
5232+
5233+
func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
5234+
c := createJetStreamClusterExplicit(t, "R3S", 3)
5235+
defer c.shutdown()
5236+
5237+
subj := "$JS.ADVISORY.TEST"
5238+
s1 := c.servers[0]
5239+
s2 := c.servers[1]
5240+
5241+
// On the first server, see if we think the advisory will be published.
5242+
require_False(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
5243+
5244+
// On the second server, subscribe to the advisory subject.
5245+
nc, _ := jsClientConnect(t, s2)
5246+
defer nc.Close()
5247+
5248+
_, err := nc.Subscribe(subj, func(_ *nats.Msg) {})
5249+
require_NoError(t, err)
5250+
5251+
// Wait for the interest to propagate to the first server.
5252+
checkFor(t, time.Second, 25*time.Millisecond, func() error {
5253+
if !s1.GlobalAccount().sl.HasInterest(subj) {
5254+
return fmt.Errorf("expected interest in %q, not yet found", subj)
5255+
}
5256+
return nil
5257+
})
5258+
5259+
// On the first server, try and publish the advisory again. THis time
5260+
// it should succeed.
5261+
require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
5262+
}

server/jetstream_events.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,22 @@ import (
1818
"time"
1919
)
2020

21-
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
21+
// publishAdvisory sends the given advisory into the account. Returns true if
22+
// it was sent, false if not (i.e. due to lack of interest or a marshal error).
23+
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) bool {
2224
if acc == nil {
2325
acc = s.SystemAccount()
2426
if acc == nil {
25-
return
27+
return false
2628
}
2729
}
30+
31+
// If there is no one listening for this advisory then save ourselves the effort
32+
// and don't bother encoding the JSON or sending it.
33+
if sl := acc.sl; (sl != nil && !sl.HasInterest(subject)) && !s.hasGatewayInterest(acc.Name, subject) {
34+
return false
35+
}
36+
2837
ej, err := json.Marshal(adv)
2938
if err == nil {
3039
err = s.sendInternalAccountMsg(acc, subject, ej)
@@ -34,6 +43,7 @@ func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
3443
} else {
3544
s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err)
3645
}
46+
return err == nil
3747
}
3848

3949
// JSAPIAudit is an advisory about administrative actions taken on JetStream

0 commit comments

Comments
 (0)