Skip to content

Commit c79d802

Browse files
committed
Strip unnecessary ClientInfo fields from stream & consumer assignment proposals
Signed-off-by: Neil Twigg <[email protected]>
1 parent c558b24 commit c79d802

File tree

2 files changed

+34
-17
lines changed

2 files changed

+34
-17
lines changed

server/events.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,17 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo {
336336
}
337337
}
338338

339+
// forProposal returns the minimum amount of ClientInfo we need for assignment proposals.
340+
func (ci *ClientInfo) forProposal() *ClientInfo {
341+
if ci == nil {
342+
return nil
343+
}
344+
cci := *ci
345+
cci.Jwt = _EMPTY_
346+
cci.IssuerKey = _EMPTY_
347+
return &cci
348+
}
349+
339350
// ServerStats hold various statistics that we will periodically send out.
340351
type ServerStats struct {
341352
Start time.Time `json:"start"`

server/jetstream_cluster.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7297,23 +7297,29 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset
72977297
}
72987298

72997299
func encodeAddStreamAssignment(sa *streamAssignment) []byte {
7300+
csa := *sa
7301+
csa.Client = csa.Client.forProposal()
73007302
var bb bytes.Buffer
73017303
bb.WriteByte(byte(assignStreamOp))
7302-
json.NewEncoder(&bb).Encode(sa)
7304+
json.NewEncoder(&bb).Encode(csa)
73037305
return bb.Bytes()
73047306
}
73057307

73067308
func encodeUpdateStreamAssignment(sa *streamAssignment) []byte {
7309+
csa := *sa
7310+
csa.Client = csa.Client.forProposal()
73077311
var bb bytes.Buffer
73087312
bb.WriteByte(byte(updateStreamOp))
7309-
json.NewEncoder(&bb).Encode(sa)
7313+
json.NewEncoder(&bb).Encode(csa)
73107314
return bb.Bytes()
73117315
}
73127316

73137317
func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
7318+
csa := *sa
7319+
csa.Client = csa.Client.forProposal()
73147320
var bb bytes.Buffer
73157321
bb.WriteByte(byte(removeStreamOp))
7316-
json.NewEncoder(&bb).Encode(sa)
7322+
json.NewEncoder(&bb).Encode(csa)
73177323
return bb.Bytes()
73187324
}
73197325

@@ -7723,16 +7729,20 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
77237729
}
77247730

77257731
func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {
7732+
cca := *ca
7733+
cca.Client = cca.Client.forProposal()
77267734
var bb bytes.Buffer
77277735
bb.WriteByte(byte(assignConsumerOp))
7728-
json.NewEncoder(&bb).Encode(ca)
7736+
json.NewEncoder(&bb).Encode(cca)
77297737
return bb.Bytes()
77307738
}
77317739

77327740
func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte {
7741+
cca := *ca
7742+
cca.Client = cca.Client.forProposal()
77337743
var bb bytes.Buffer
77347744
bb.WriteByte(byte(removeConsumerOp))
7735-
json.NewEncoder(&bb).Encode(ca)
7745+
json.NewEncoder(&bb).Encode(cca)
77367746
return bb.Bytes()
77377747
}
77387748

@@ -7743,25 +7753,21 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
77437753
}
77447754

77457755
func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte {
7746-
b, err := json.Marshal(ca)
7747-
if err != nil {
7748-
return nil
7749-
}
7750-
// TODO(dlc) - Streaming better approach here probably.
7756+
cca := *ca
7757+
cca.Client = cca.Client.forProposal()
77517758
var bb bytes.Buffer
77527759
bb.WriteByte(byte(assignCompressedConsumerOp))
7753-
bb.Write(s2.Encode(nil, b))
7760+
s2e := s2.NewWriter(&bb)
7761+
json.NewEncoder(s2e).Encode(cca)
7762+
s2e.Close()
77547763
return bb.Bytes()
77557764
}
77567765

77577766
func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) {
77587767
var ca consumerAssignment
7759-
js, err := s2.Decode(nil, buf)
7760-
if err != nil {
7761-
return nil, err
7762-
}
7763-
err = json.Unmarshal(js, &ca)
7764-
return &ca, err
7768+
bb := bytes.NewBuffer(buf)
7769+
s2d := s2.NewReader(bb)
7770+
return &ca, json.NewDecoder(s2d).Decode(&ca)
77657771
}
77667772

77677773
var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")

0 commit comments

Comments
 (0)