Skip to content

Commit 624b120

Browse files
Cherry-picks for 2.10.23-RC.4 (#6162)
Includes: - #6147 - #6150 - #6151 - #6153 - #6154 - #6146 - #6139 - #6152 - #6157 - #6161 Signed-off-by: Neil Twigg <[email protected]>
2 parents 77896a2 + 60b2d33 commit 624b120

23 files changed

+1515
-245
lines changed

server/client.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4684,19 +4684,32 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
46844684
continue
46854685
}
46864686
// Remember that leaf in case we don't find any other candidate.
4687+
// We already start randomly in lqs slice, so we don't need
4688+
// to do a random swap if we already have an rsub like we do
4689+
// when src == ROUTER above.
46874690
if rsub == nil {
46884691
rsub = sub
46894692
}
46904693
continue
46914694
} else {
4692-
// We would be picking a route, but if we had remembered a "hub" leaf,
4693-
// then pick that one instead of the route.
4694-
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
4695-
break
4695+
// We want to favor qsubs in our own cluster. If the routed
4696+
// qsub has an origin, it means that is on behalf of a leaf.
4697+
// We need to treat it differently.
4698+
if len(sub.origin) > 0 {
4699+
// If we already have an rsub, nothing to do. Also, do
4700+
// not pick a routed qsub for a LEAF origin cluster
4701+
// that is the same than where the message comes from.
4702+
if rsub == nil && (leafOrigin == _EMPTY_ || leafOrigin != bytesToString(sub.origin)) {
4703+
rsub = sub
4704+
}
4705+
continue
46964706
}
4707+
// This is a qsub that is local on the remote server (or
4708+
// we are connected to an older server and we don't know).
4709+
// Pick this one and be done.
46974710
rsub = sub
4711+
break
46984712
}
4699-
break
47004713
}
47014714

47024715
// Assume delivery subject is normal subject to this point.

server/const.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ const (
171171
// MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto.
172172
MAX_HPUB_ARGS = 4
173173

174+
// MAX_RSUB_ARGS Maximum possible number of arguments from a RS+/LS+ proto.
175+
MAX_RSUB_ARGS = 6
176+
174177
// DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto.
175178
DEFAULT_MAX_CLOSED_CLIENTS = 10000
176179

server/consumer.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ func checkConsumerCfg(
505505
}
506506

507507
// Check if we have a BackOff defined that MaxDeliver is within range etc.
508-
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && config.MaxDeliver <= lbo {
508+
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver {
509509
return NewJSConsumerMaxDeliverBackoffError()
510510
}
511511

@@ -1843,7 +1843,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
18431843
}
18441844

18451845
// Check if BackOff is defined, MaxDeliver is within range.
1846-
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && ncfg.MaxDeliver <= lbo {
1846+
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
18471847
return NewJSConsumerMaxDeliverBackoffError()
18481848
}
18491849

@@ -2206,9 +2206,7 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
22062206
n += binary.PutUvarint(b[n:], dc)
22072207
n += binary.PutVarint(b[n:], ts)
22082208
o.propose(b[:n])
2209-
}
2210-
if o.store != nil {
2211-
// Update local state always.
2209+
} else if o.store != nil {
22122210
o.store.UpdateDelivered(dseq, sseq, dc, ts)
22132211
}
22142212
// Update activity.
@@ -3839,7 +3837,7 @@ func (o *consumer) checkAckFloor() {
38393837
// We will set it explicitly to 1 behind our current lowest in pending, or if
38403838
// pending is empty, to our current delivered -1.
38413839
const minOffThreshold = 50
3842-
if o.asflr < ss.FirstSeq-minOffThreshold {
3840+
if ss.FirstSeq >= minOffThreshold && o.asflr < ss.FirstSeq-minOffThreshold {
38433841
var psseq, pdseq uint64
38443842
for seq, p := range o.pending {
38453843
if psseq == 0 || seq < psseq {
@@ -5245,12 +5243,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
52455243
if dflag {
52465244
n.Delete()
52475245
} else {
5248-
// Try to install snapshot on clean exit
5249-
if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) {
5250-
if snap, err := o.store.EncodedState(); err == nil {
5251-
n.InstallSnapshot(snap)
5252-
}
5253-
}
52545246
n.Stop()
52555247
}
52565248
}
@@ -5595,8 +5587,9 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
55955587

55965588
o.mu.Lock()
55975589
// Update our check floor.
5598-
if seq > o.chkflr {
5599-
o.chkflr = seq
5590+
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
5591+
if asflr+1 > o.chkflr {
5592+
o.chkflr = asflr + 1
56005593
}
56015594
// See if we need to process this update if our parent stream is not a limits policy stream.
56025595
state, _ = o.store.State()

server/filestore.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9320,14 +9320,6 @@ func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
93209320
}
93219321

93229322
func (o *consumerFileStore) Update(state *ConsumerState) error {
9323-
o.mu.Lock()
9324-
defer o.mu.Unlock()
9325-
9326-
// Check to see if this is an outdated update.
9327-
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
9328-
return nil
9329-
}
9330-
93319323
// Sanity checks.
93329324
if state.AckFloor.Consumer > state.Delivered.Consumer {
93339325
return fmt.Errorf("bad ack floor for consumer")
@@ -9355,6 +9347,15 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
93559347
}
93569348
}
93579349

9350+
// Replace our state.
9351+
o.mu.Lock()
9352+
defer o.mu.Unlock()
9353+
9354+
// Check to see if this is an outdated update.
9355+
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
9356+
return fmt.Errorf("old update ignored")
9357+
}
9358+
93589359
o.state.Delivered = state.Delivered
93599360
o.state.AckFloor = state.AckFloor
93609361
o.state.Pending = pending

server/gateway.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1900,7 +1900,7 @@ func (c *client) processGatewayAccountSub(accName string) error {
19001900
// the sublist if present.
19011901
// <Invoked from outbound connection's readLoop>
19021902
func (c *client) processGatewayRUnsub(arg []byte) error {
1903-
accName, subject, queue, err := c.parseUnsubProto(arg)
1903+
_, accName, subject, queue, err := c.parseUnsubProto(arg, true, false)
19041904
if err != nil {
19051905
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
19061906
}

server/jetstream_cluster.go

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2405,7 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
24052405
// fully recovered from disk.
24062406
isRecovering := true
24072407

2408-
// Should only to be called from leader.
24092408
doSnapshot := func() {
24102409
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
24112410
return
@@ -4939,23 +4938,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
49394938
}
49404939

49414940
// Process the change.
4942-
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
4941+
if err := js.processConsumerLeaderChange(o, isLeader); err == nil {
49434942
// Check our state if we are under an interest based stream.
49444943
if mset := o.getStream(); mset != nil {
49454944
var ss StreamState
49464945
mset.store.FastState(&ss)
49474946
o.checkStateForInterestStream(&ss)
49484947
}
4949-
// Do a snapshot.
4950-
doSnapshot(true)
4951-
// Synchronize followers to our state. Only send out if we have state and nothing pending.
4952-
if n != nil {
4953-
if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 {
4954-
if snap, err := o.store.EncodedState(); err == nil {
4955-
n.SendSnapshot(snap)
4956-
}
4957-
}
4958-
}
49594948
}
49604949

49614950
// We may receive a leader change after the consumer assignment which would cancel us
@@ -5110,25 +5099,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
51105099
buf := e.Data
51115100
switch entryOp(buf[0]) {
51125101
case updateDeliveredOp:
5113-
// These are handled in place in leaders.
5114-
if !isLeader {
5115-
dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:])
5116-
if err != nil {
5117-
if mset, node := o.streamAndNode(); mset != nil && node != nil {
5118-
s := js.srv
5119-
s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]",
5120-
mset.account(), mset.name(), o, node.Group())
5121-
}
5122-
panic(err.Error())
5123-
}
5124-
// Make sure to update delivered under the lock.
5125-
o.mu.Lock()
5126-
err = o.store.UpdateDelivered(dseq, sseq, dc, ts)
5127-
o.ldt = time.Now()
5128-
o.mu.Unlock()
5129-
if err != nil {
5130-
panic(err.Error())
5102+
dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:])
5103+
if err != nil {
5104+
if mset, node := o.streamAndNode(); mset != nil && node != nil {
5105+
s := js.srv
5106+
s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]",
5107+
mset.account(), mset.name(), o, node.Group())
51315108
}
5109+
panic(err.Error())
5110+
}
5111+
// Make sure to update delivered under the lock.
5112+
o.mu.Lock()
5113+
err = o.store.UpdateDelivered(dseq, sseq, dc, ts)
5114+
o.ldt = time.Now()
5115+
o.mu.Unlock()
5116+
if err != nil {
5117+
panic(err.Error())
51325118
}
51335119
case updateAcksOp:
51345120
dseq, sseq, err := decodeAckUpdate(buf[1:])

server/jetstream_cluster_3_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3543,7 +3543,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
35433543
s.WaitForShutdown()
35443544
}
35453545

3546-
// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
3546+
// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
35473547
// it could miss the signal of a message going away. If that message was pending and expires the
35483548
// ack floor could fall below the stream first sequence. This test will force that condition and
35493549
// make sure the system resolves itself.
@@ -3566,7 +3566,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
35663566
sub, err := js.PullSubscribe("foo", "C")
35673567
require_NoError(t, err)
35683568

3569-
for i := 0; i < 10; i++ {
3569+
// Publish as many messages as the ack floor check threshold +5.
3570+
totalMessages := 55
3571+
for i := 0; i < totalMessages; i++ {
35703572
sendStreamMsg(t, nc, "foo", "HELLO")
35713573
}
35723574

@@ -3610,10 +3612,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
36103612
o := mset.lookupConsumer("C")
36113613
require_NotNil(t, o)
36123614
o.mu.Lock()
3613-
err = o.setStoreState(state)
3615+
o.applyState(state)
36143616
cfs := o.store.(*consumerFileStore)
36153617
o.mu.Unlock()
3616-
require_NoError(t, err)
36173618
// The lower layer will ignore, so set more directly.
36183619
cfs.mu.Lock()
36193620
cfs.state = *state
@@ -3631,10 +3632,10 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
36313632
ci, err := js.ConsumerInfo("TEST", "C")
36323633
require_NoError(t, err)
36333634
// Make sure we catch this and adjust.
3634-
if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 {
3635+
if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 {
36353636
return nil
36363637
}
3637-
return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor)
3638+
return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor)
36383639
})
36393640
}
36403641

0 commit comments

Comments
 (0)