Skip to content

Cherry-picks for 2.10.23-RC.4 #6162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4684,19 +4684,32 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
// Remember that leaf in case we don't find any other candidate.
// We already start randomly in lqs slice, so we don't need
// to do a random swap if we already have an rsub like we do
// when src == ROUTER above.
if rsub == nil {
rsub = sub
}
continue
} else {
// We would be picking a route, but if we had remembered a "hub" leaf,
// then pick that one instead of the route.
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
break
// We want to favor qsubs in our own cluster. If the routed
// qsub has an origin, it means that is on behalf of a leaf.
// We need to treat it differently.
if len(sub.origin) > 0 {
// If we already have an rsub, nothing to do. Also, do
// not pick a routed qsub for a LEAF origin cluster
// that is the same than where the message comes from.
if rsub == nil && (leafOrigin == _EMPTY_ || leafOrigin != bytesToString(sub.origin)) {
rsub = sub
}
continue
}
// This is a qsub that is local on the remote server (or
// we are connected to an older server and we don't know).
// Pick this one and be done.
rsub = sub
break
}
break
}

// Assume delivery subject is normal subject to this point.
Expand Down
3 changes: 3 additions & 0 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ const (
// MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto.
MAX_HPUB_ARGS = 4

// MAX_RSUB_ARGS Maximum possible number of arguments from a RS+/LS+ proto.
MAX_RSUB_ARGS = 6

// DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto.
DEFAULT_MAX_CLOSED_CLIENTS = 10000

Expand Down
21 changes: 7 additions & 14 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func checkConsumerCfg(
}

// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && config.MaxDeliver <= lbo {
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver {
return NewJSConsumerMaxDeliverBackoffError()
}

Expand Down Expand Up @@ -1843,7 +1843,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
}

// Check if BackOff is defined, MaxDeliver is within range.
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && ncfg.MaxDeliver <= lbo {
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
return NewJSConsumerMaxDeliverBackoffError()
}

Expand Down Expand Up @@ -2206,9 +2206,7 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
n += binary.PutUvarint(b[n:], dc)
n += binary.PutVarint(b[n:], ts)
o.propose(b[:n])
}
if o.store != nil {
// Update local state always.
} else if o.store != nil {
o.store.UpdateDelivered(dseq, sseq, dc, ts)
}
// Update activity.
Expand Down Expand Up @@ -3839,7 +3837,7 @@ func (o *consumer) checkAckFloor() {
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
const minOffThreshold = 50
if o.asflr < ss.FirstSeq-minOffThreshold {
if ss.FirstSeq >= minOffThreshold && o.asflr < ss.FirstSeq-minOffThreshold {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
Expand Down Expand Up @@ -5245,12 +5243,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if dflag {
n.Delete()
} else {
// Try to install snapshot on clean exit
if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) {
if snap, err := o.store.EncodedState(); err == nil {
n.InstallSnapshot(snap)
}
}
n.Stop()
}
}
Expand Down Expand Up @@ -5595,8 +5587,9 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {

o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
if asflr+1 > o.chkflr {
o.chkflr = asflr + 1
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
Expand Down
17 changes: 9 additions & 8 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9320,14 +9320,6 @@ func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
}

func (o *consumerFileStore) Update(state *ConsumerState) error {
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return nil
}

// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
Expand Down Expand Up @@ -9355,6 +9347,15 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
}
}

// Replace our state.
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return fmt.Errorf("old update ignored")
}

o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ func (c *client) processGatewayAccountSub(accName string) error {
// the sublist if present.
// <Invoked from outbound connection's readLoop>
func (c *client) processGatewayRUnsub(arg []byte) error {
accName, subject, queue, err := c.parseUnsubProto(arg)
_, accName, subject, queue, err := c.parseUnsubProto(arg, true, false)
if err != nil {
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
}
Expand Down
46 changes: 16 additions & 30 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,7 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// fully recovered from disk.
isRecovering := true

// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
Expand Down Expand Up @@ -4939,23 +4938,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}

// Process the change.
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
if err := js.processConsumerLeaderChange(o, isLeader); err == nil {
// Check our state if we are under an interest based stream.
if mset := o.getStream(); mset != nil {
var ss StreamState
mset.store.FastState(&ss)
o.checkStateForInterestStream(&ss)
}
// Do a snapshot.
doSnapshot(true)
// Synchronize followers to our state. Only send out if we have state and nothing pending.
if n != nil {
if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
}
}
}

// We may receive a leader change after the consumer assignment which would cancel us
Expand Down Expand Up @@ -5110,25 +5099,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
buf := e.Data
switch entryOp(buf[0]) {
case updateDeliveredOp:
// These are handled in place in leaders.
if !isLeader {
dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:])
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
// Make sure to update delivered under the lock.
o.mu.Lock()
err = o.store.UpdateDelivered(dseq, sseq, dc, ts)
o.ldt = time.Now()
o.mu.Unlock()
if err != nil {
panic(err.Error())
dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:])
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
// Make sure to update delivered under the lock.
o.mu.Lock()
err = o.store.UpdateDelivered(dseq, sseq, dc, ts)
o.ldt = time.Now()
o.mu.Unlock()
if err != nil {
panic(err.Error())
}
case updateAcksOp:
dseq, sseq, err := decodeAckUpdate(buf[1:])
Expand Down
13 changes: 7 additions & 6 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3543,7 +3543,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
s.WaitForShutdown()
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
// make sure the system resolves itself.
Expand All @@ -3566,7 +3566,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
sub, err := js.PullSubscribe("foo", "C")
require_NoError(t, err)

for i := 0; i < 10; i++ {
// Publish as many messages as the ack floor check threshold +5.
totalMessages := 55
for i := 0; i < totalMessages; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

Expand Down Expand Up @@ -3610,10 +3612,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.Lock()
err = o.setStoreState(state)
o.applyState(state)
cfs := o.store.(*consumerFileStore)
o.mu.Unlock()
require_NoError(t, err)
// The lower layer will ignore, so set more directly.
cfs.mu.Lock()
cfs.state = *state
Expand All @@ -3631,10 +3632,10 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
// Make sure we catch this and adjust.
if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 {
if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 {
return nil
}
return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor)
return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor)
})
}

Expand Down
Loading