Skip to content

Commit 71ba974

Browse files
[FIXED] Don't InstallSnapshot during shutdown, would race with monitorStream/monitorConsumer (#6153)
When stopping a stream or consumer, we would attempt to install a snapshot. However, this would race with what's happening in `monitorStream`/`monitorConsumer` at that time. For example: 1. In `applyStreamEntries` we call into `mset.processJetStreamMsg` to persist one or multiple messages. 2. We call `mset.stop(..)` either before or during the above. 3. In `mset.stop(..)` we'd wait for `mset.processJetStreamMsg` to release the lock so we can enter `mset.stateSnapshotLocked()`. **We create a snapshot with new state here!** 4. Now we call into `InstallSnapshot` to persist above snapshot, but `n.applied` does not contain the right value, the value will be lower. 5. Then `applyStreamEntries` finishes and we end with calling `n.Applied(..)`. This would be a race condition depending on if 4 happened before or after 5. It's essential that the snapshot we make is aligned with the `n.applied` value. If we don't that means we'll replay and need to increase `mset.clfs` which will snowball into stream desync due to this shift. The only place where we can guarantee that the snapshot and applied are aligned is in `doSnapshot` of `monitorStream` and `monitorConsumer` (and `monitorCluster`), so we must not attempt installing snapshots outside of those. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 28c581b + 6938f97 commit 71ba974

File tree

4 files changed

+185
-9
lines changed

4 files changed

+185
-9
lines changed

server/consumer.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5700,12 +5700,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
57005700
if dflag {
57015701
n.Delete()
57025702
} else {
5703-
// Try to install snapshot on clean exit
5704-
if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) {
5705-
if snap, err := o.store.EncodedState(); err == nil {
5706-
n.InstallSnapshot(snap)
5707-
}
5708-
}
57095703
n.Stop()
57105704
}
57115705
}

server/jetstream_cluster.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2425,7 +2425,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
24252425
// fully recovered from disk.
24262426
isRecovering := true
24272427

2428-
// Should only to be called from leader.
24292428
doSnapshot := func() {
24302429
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
24312430
return

server/jetstream_cluster_4_test.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4585,3 +4585,188 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
45854585
}
45864586
}
45874587
}
4588+
4589+
func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
4590+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4591+
defer c.shutdown()
4592+
4593+
nc, js := jsClientConnect(t, c.randomServer())
4594+
defer nc.Close()
4595+
4596+
_, err := js.AddStream(&nats.StreamConfig{
4597+
Name: "TEST",
4598+
Subjects: []string{"foo"},
4599+
Retention: nats.WorkQueuePolicy,
4600+
Replicas: 3,
4601+
})
4602+
require_NoError(t, err)
4603+
4604+
_, err = js.Publish("foo", nil)
4605+
require_NoError(t, err)
4606+
4607+
// Wait for all servers to have applied everything.
4608+
var maxApplied uint64
4609+
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
4610+
maxApplied = 0
4611+
for _, s := range c.servers {
4612+
acc, err := s.lookupAccount(globalAccountName)
4613+
if err != nil {
4614+
return err
4615+
}
4616+
mset, err := acc.lookupStream("TEST")
4617+
if err != nil {
4618+
return err
4619+
}
4620+
_, _, applied := mset.node.Progress()
4621+
if maxApplied == 0 {
4622+
maxApplied = applied
4623+
} else if applied < maxApplied {
4624+
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
4625+
} else if applied > maxApplied {
4626+
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
4627+
}
4628+
}
4629+
return nil
4630+
})
4631+
4632+
// Install a snapshot on a follower.
4633+
s := c.randomNonStreamLeader(globalAccountName, "TEST")
4634+
acc, err := s.lookupAccount(globalAccountName)
4635+
require_NoError(t, err)
4636+
mset, err := acc.lookupStream("TEST")
4637+
require_NoError(t, err)
4638+
err = mset.node.InstallSnapshot(mset.stateSnapshotLocked())
4639+
require_NoError(t, err)
4640+
4641+
// Validate the snapshot reflects applied.
4642+
validateStreamState := func(snap *snapshot) {
4643+
t.Helper()
4644+
require_Equal(t, snap.lastIndex, maxApplied)
4645+
ss, err := DecodeStreamState(snap.data)
4646+
require_NoError(t, err)
4647+
require_Equal(t, ss.FirstSeq, 1)
4648+
require_Equal(t, ss.LastSeq, 1)
4649+
}
4650+
snap, err := mset.node.(*raft).loadLastSnapshot()
4651+
require_NoError(t, err)
4652+
validateStreamState(snap)
4653+
4654+
// Simulate a message being stored, but not calling Applied yet.
4655+
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil)
4656+
require_NoError(t, err)
4657+
4658+
// Simulate the stream being stopped before we're able to call Applied.
4659+
// If we'd install a snapshot during this, which would be a race condition,
4660+
// we'd store a snapshot with state that's ahead of applied.
4661+
err = mset.stop(false, false)
4662+
require_NoError(t, err)
4663+
4664+
// Validate the snapshot is the same as before.
4665+
snap, err = mset.node.(*raft).loadLastSnapshot()
4666+
require_NoError(t, err)
4667+
validateStreamState(snap)
4668+
}
4669+
4670+
func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
4671+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4672+
defer c.shutdown()
4673+
4674+
nc, js := jsClientConnect(t, c.randomServer())
4675+
defer nc.Close()
4676+
4677+
_, err := js.AddStream(&nats.StreamConfig{
4678+
Name: "TEST",
4679+
Subjects: []string{"foo"},
4680+
Retention: nats.WorkQueuePolicy,
4681+
Replicas: 3,
4682+
})
4683+
require_NoError(t, err)
4684+
4685+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
4686+
Durable: "CONSUMER",
4687+
Replicas: 3,
4688+
AckPolicy: nats.AckExplicitPolicy,
4689+
})
4690+
require_NoError(t, err)
4691+
4692+
// Add a message and let the consumer ack it, this moves the consumer's RAFT applied up.
4693+
_, err = js.Publish("foo", nil)
4694+
require_NoError(t, err)
4695+
sub, err := js.PullSubscribe("foo", "CONSUMER")
4696+
require_NoError(t, err)
4697+
msgs, err := sub.Fetch(1)
4698+
require_NoError(t, err)
4699+
require_Len(t, len(msgs), 1)
4700+
err = msgs[0].AckSync()
4701+
require_NoError(t, err)
4702+
4703+
// Wait for all servers to have applied everything.
4704+
var maxApplied uint64
4705+
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
4706+
maxApplied = 0
4707+
for _, s := range c.servers {
4708+
acc, err := s.lookupAccount(globalAccountName)
4709+
if err != nil {
4710+
return err
4711+
}
4712+
mset, err := acc.lookupStream("TEST")
4713+
if err != nil {
4714+
return err
4715+
}
4716+
o := mset.lookupConsumer("CONSUMER")
4717+
if o == nil {
4718+
return errors.New("consumer not found")
4719+
}
4720+
_, _, applied := o.node.Progress()
4721+
if maxApplied == 0 {
4722+
maxApplied = applied
4723+
} else if applied < maxApplied {
4724+
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
4725+
} else if applied > maxApplied {
4726+
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
4727+
}
4728+
}
4729+
return nil
4730+
})
4731+
4732+
// Install a snapshot on a follower.
4733+
s := c.randomNonStreamLeader(globalAccountName, "TEST")
4734+
acc, err := s.lookupAccount(globalAccountName)
4735+
require_NoError(t, err)
4736+
mset, err := acc.lookupStream("TEST")
4737+
require_NoError(t, err)
4738+
o := mset.lookupConsumer("CONSUMER")
4739+
require_NotNil(t, o)
4740+
snapBytes, err := o.store.EncodedState()
4741+
require_NoError(t, err)
4742+
err = o.node.InstallSnapshot(snapBytes)
4743+
require_NoError(t, err)
4744+
4745+
// Validate the snapshot reflects applied.
4746+
validateStreamState := func(snap *snapshot) {
4747+
t.Helper()
4748+
require_Equal(t, snap.lastIndex, maxApplied)
4749+
state, err := decodeConsumerState(snap.data)
4750+
require_NoError(t, err)
4751+
require_Equal(t, state.Delivered.Consumer, 1)
4752+
require_Equal(t, state.Delivered.Stream, 1)
4753+
}
4754+
snap, err := o.node.(*raft).loadLastSnapshot()
4755+
require_NoError(t, err)
4756+
validateStreamState(snap)
4757+
4758+
// Simulate a message being delivered, but not calling Applied yet.
4759+
err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano())
4760+
require_NoError(t, err)
4761+
4762+
// Simulate the consumer being stopped before we're able to call Applied.
4763+
// If we'd install a snapshot during this, which would be a race condition,
4764+
// we'd store a snapshot with state that's ahead of applied.
4765+
err = o.stop()
4766+
require_NoError(t, err)
4767+
4768+
// Validate the snapshot is the same as before.
4769+
snap, err = o.node.(*raft).loadLastSnapshot()
4770+
require_NoError(t, err)
4771+
validateStreamState(snap)
4772+
}

server/stream.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5563,8 +5563,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
55635563
n.Delete()
55645564
sa = mset.sa
55655565
} else {
5566-
// Always attempt snapshot on clean exit.
5567-
n.InstallSnapshot(mset.stateSnapshotLocked())
55685566
n.Stop()
55695567
}
55705568
}

0 commit comments

Comments
 (0)