Skip to content

[FIXED] Attempt stream snapshot on shutdown #6942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
doSnapshot()
return
case <-mqch:
// Clean signal from shutdown routine so do best effort attempt to snapshot.
doSnapshot()
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot.
Expand Down
57 changes: 57 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8464,6 +8464,63 @@ func TestJetStreamClusterOfflineR1ConsumerDenyUpdate(t *testing.T) {
require_Error(t, err, NewJSConsumerOfflineError())
}

func TestJetStreamClusterSnapshotStreamAssetOnShutdown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

var sds []string
for _, s := range c.servers {
sds = append(sds, s.StoreDir())
}

for _, sd := range sds {
matches, err := filepath.Glob(filepath.Join(sd, "$SYS", "_js_", "*", snapshotsDir, "*"))
require_NoError(t, err)
require_True(t, len(matches) > 0)
for _, match := range matches {
require_NoError(t, os.RemoveAll(match))
}
}

// Publish, so we have something new to snapshot.
_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Shutdown servers, and check if all made stream snapshots.
for _, s := range c.servers {
s.Shutdown()
}
for _, sd := range sds {
matches, err := filepath.Glob(filepath.Join(sd, "$SYS", "_js_", "*", snapshotsDir))
require_NoError(t, err)
// Matches _meta_ and stream raft groups.
require_Len(t, len(matches), 2)
var foundStream bool
for _, match := range matches {
if !strings.Contains(match, "S-R3F") {
continue
}
foundStream = true
dirs, err := os.ReadDir(match)
require_NoError(t, err)
if len(dirs) != 1 {
t.Errorf("Missing snapshot for %s", match)
}
}
require_True(t, foundStream)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6316,7 +6316,11 @@ func TestJetStreamClusterStreamResetOnExpirationDuringPeerDownAndRestartWithLead
// Now clear raft WAL.
mset, err := nsl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
require_NoError(t, mset.raftNode().InstallSnapshot(mset.stateSnapshot()))
// Snapshot could already be done during shutdown. If so, snapshotting again will not be available.
err = mset.raftNode().InstallSnapshot(mset.stateSnapshot())
if err != nil {
require_Error(t, err, errNoSnapAvailable)
}

nsl.Shutdown()
nsl = c.restartServer(nsl)
Expand Down
3 changes: 2 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5788,7 +5788,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
if deleteFlag {
n.Delete()
sa = mset.sa
} else {
} else if !isShuttingDown {
// Stop Raft, unless JetStream is already shutting down, in which case they'll be stopped separately.
n.Stop()
}
}
Expand Down
Loading