Skip to content

Commit facc0a5

Browse files
[FIXED] Attempt stream snapshot on shutdown (#6942)
When shutting down the stream would not be snapshotted. Either the monitor would close first and `doSnapshot` would not be called, or there would be a race with the Raft node being stopped and `InstallSnapshot` becoming a no-op. Relates to #6279 Signed-off-by: Maurice van Veen <[email protected]>
2 parents 274ea2c + d12018f commit facc0a5

File tree

4 files changed

+66
-2
lines changed

4 files changed

+66
-2
lines changed

server/jetstream_cluster.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2417,6 +2417,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
24172417
doSnapshot()
24182418
return
24192419
case <-mqch:
2420+
// Clean signal from shutdown routine so do best effort attempt to snapshot.
2421+
doSnapshot()
24202422
return
24212423
case <-qch:
24222424
// Clean signal from shutdown routine so do best effort attempt to snapshot.

server/jetstream_cluster_1_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8464,6 +8464,63 @@ func TestJetStreamClusterOfflineR1ConsumerDenyUpdate(t *testing.T) {
84648464
require_Error(t, err, NewJSConsumerOfflineError())
84658465
}
84668466

8467+
func TestJetStreamClusterSnapshotStreamAssetOnShutdown(t *testing.T) {
8468+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8469+
defer c.shutdown()
8470+
8471+
nc, js := jsClientConnect(t, c.randomServer())
8472+
defer nc.Close()
8473+
8474+
_, err := js.AddStream(&nats.StreamConfig{
8475+
Name: "TEST",
8476+
Subjects: []string{"foo"},
8477+
Replicas: 3,
8478+
})
8479+
require_NoError(t, err)
8480+
8481+
var sds []string
8482+
for _, s := range c.servers {
8483+
sds = append(sds, s.StoreDir())
8484+
}
8485+
8486+
for _, sd := range sds {
8487+
matches, err := filepath.Glob(filepath.Join(sd, "$SYS", "_js_", "*", snapshotsDir, "*"))
8488+
require_NoError(t, err)
8489+
require_True(t, len(matches) > 0)
8490+
for _, match := range matches {
8491+
require_NoError(t, os.RemoveAll(match))
8492+
}
8493+
}
8494+
8495+
// Publish, so we have something new to snapshot.
8496+
_, err = js.Publish("foo", nil)
8497+
require_NoError(t, err)
8498+
8499+
// Shutdown servers, and check if all made stream snapshots.
8500+
for _, s := range c.servers {
8501+
s.Shutdown()
8502+
}
8503+
for _, sd := range sds {
8504+
matches, err := filepath.Glob(filepath.Join(sd, "$SYS", "_js_", "*", snapshotsDir))
8505+
require_NoError(t, err)
8506+
// Matches _meta_ and stream raft groups.
8507+
require_Len(t, len(matches), 2)
8508+
var foundStream bool
8509+
for _, match := range matches {
8510+
if !strings.Contains(match, "S-R3F") {
8511+
continue
8512+
}
8513+
foundStream = true
8514+
dirs, err := os.ReadDir(match)
8515+
require_NoError(t, err)
8516+
if len(dirs) != 1 {
8517+
t.Errorf("Missing snapshot for %s", match)
8518+
}
8519+
}
8520+
require_True(t, foundStream)
8521+
}
8522+
}
8523+
84678524
//
84688525
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
84698526
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/jetstream_cluster_2_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6316,7 +6316,11 @@ func TestJetStreamClusterStreamResetOnExpirationDuringPeerDownAndRestartWithLead
63166316
// Now clear raft WAL.
63176317
mset, err := nsl.GlobalAccount().lookupStream("TEST")
63186318
require_NoError(t, err)
6319-
require_NoError(t, mset.raftNode().InstallSnapshot(mset.stateSnapshot()))
6319+
// Snapshot could already be done during shutdown. If so, snapshotting again will not be available.
6320+
err = mset.raftNode().InstallSnapshot(mset.stateSnapshot())
6321+
if err != nil {
6322+
require_Error(t, err, errNoSnapAvailable)
6323+
}
63206324

63216325
nsl.Shutdown()
63226326
nsl = c.restartServer(nsl)

server/stream.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5788,7 +5788,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
57885788
if deleteFlag {
57895789
n.Delete()
57905790
sa = mset.sa
5791-
} else {
5791+
} else if !isShuttingDown {
5792+
// Stop Raft, unless JetStream is already shutting down, in which case they'll be stopped separately.
57925793
n.Stop()
57935794
}
57945795
}

0 commit comments

Comments
 (0)