Skip to content

Only encode & send advisories when there is interest #6341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 26 additions & 58 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,8 +1588,23 @@ func (o *consumer) unsubscribe(sub *subscription) {

// We need to make sure we protect access to the outq.
// Do all advisory sends here.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
o.outq.sendMsg(subj, msg)
func (o *consumer) sendAdvisory(subject string, e any) {
if o.acc == nil {
return
}

// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) {
return
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.outq.sendMsg(subject, j)
}

func (o *consumer) sendDeleteAdvisoryLocked() {
Expand All @@ -1605,13 +1620,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did this here on purpose..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it matters where the marshaling takes place, but if we can avoid doing it until after the interest check, that saves us some CPU time & allocs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do it after the check for sure. But I need to recall why I put it there? Maybe use more cores?

if err != nil {
return
}

subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

func (o *consumer) sendPinnedAdvisoryLocked(group string) {
Expand All @@ -1629,13 +1639,8 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) {
Group: group,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPinnedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)

}
func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
Expand All @@ -1653,13 +1658,8 @@ func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
Reason: reason,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerUnpinnedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)

}

Expand All @@ -1679,13 +1679,8 @@ func (o *consumer) sendCreateAdvisory() {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
Expand All @@ -1705,13 +1700,8 @@ func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
e.Paused = time.Now().Before(e.PauseUntil)
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

// Created returns created time.
Expand Down Expand Up @@ -2652,12 +2642,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.nakEventT, j)
o.sendAdvisory(o.nakEventT, e)

// Check to see if we have delays attached.
if len(nak) > len(AckNak) {
Expand Down Expand Up @@ -2732,15 +2717,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
// We had an error during the marshal, so we can't send the advisory,
// but we still need to tell the caller that the ack was processed.
return ackedInPlace
}

subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
return ackedInPlace
}

Expand Down Expand Up @@ -3052,12 +3030,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.ackEventT, j)
o.sendAdvisory(o.ackEventT, e)
}

// Process an ACK.
Expand Down Expand Up @@ -3946,12 +3919,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.deliveryExcEventT, j)
o.sendAdvisory(o.deliveryExcEventT, e)
}

// Check if the candidate subject matches a filter if its present.
Expand Down
31 changes: 31 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5229,3 +5229,34 @@ func TestJetStreamClusterMetaStepdownPreferred(t *testing.T) {
require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF)
})
}

func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

subj := "$JS.ADVISORY.TEST"
s1 := c.servers[0]
s2 := c.servers[1]

// On the first server, see if we think the advisory will be published.
require_False(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))

// On the second server, subscribe to the advisory subject.
nc, _ := jsClientConnect(t, s2)
defer nc.Close()

_, err := nc.Subscribe(subj, func(_ *nats.Msg) {})
require_NoError(t, err)

// Wait for the interest to propagate to the first server.
checkFor(t, time.Second, 25*time.Millisecond, func() error {
if !s1.GlobalAccount().sl.HasInterest(subj) {
return fmt.Errorf("expected interest in %q, not yet found", subj)
}
return nil
})

// On the first server, try and publish the advisory again. THis time
// it should succeed.
require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
}
14 changes: 12 additions & 2 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ import (
"time"
)

func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
// publishAdvisory sends the given advisory into the account. Returns true if
// it was sent, false if not (i.e. due to lack of interest or a marshal error).
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) bool {
if acc == nil {
acc = s.SystemAccount()
if acc == nil {
return
return false
}
}

// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := acc.sl; (sl != nil && !sl.HasInterest(subject)) && !s.hasGatewayInterest(acc.Name, subject) {
return false
}

ej, err := json.Marshal(adv)
if err == nil {
err = s.sendInternalAccountMsg(acc, subject, ej)
Expand All @@ -34,6 +43,7 @@ func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
} else {
s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err)
}
return err == nil
}

// JSAPIAudit is an advisory about administrative actions taken on JetStream
Expand Down
Loading