Skip to content

[FIXED] Only recover .blk files in file store #6684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ const (
purgeDir = "__msgs__"
// used to scan blk file names.
blkScan = "%d.blk"
// suffix of a block file
blkSuffix = ".blk"
// used for compacted blocks that are staged.
newScan = "%d.new"
// used to scan index file names.
Expand Down Expand Up @@ -1818,6 +1820,10 @@ func (fs *fileStore) recoverFullState() (rerr error) {

var index uint32
for _, fi := range dirs {
// Ensure it's actually a block file, otherwise fmt.Sscanf also matches %d.blk.tmp
if !strings.HasSuffix(fi.Name(), blkSuffix) {
continue
}
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
if index > blkIndex {
fs.warn("Stream state outdated, found extra blocks, will rebuild")
Expand Down Expand Up @@ -2006,6 +2012,10 @@ func (fs *fileStore) recoverMsgs() error {
indices := make(sort.IntSlice, 0, len(dirs))
var index int
for _, fi := range dirs {
// Ensure it's actually a block file, otherwise fmt.Sscanf also matches %d.blk.tmp
if !strings.HasSuffix(fi.Name(), blkSuffix) {
continue
}
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
indices = append(indices, index)
}
Expand Down
92 changes: 80 additions & 12 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5292,12 +5292,14 @@ func TestFileStoreFullStateBasics(t *testing.T) {
msgZ := bytes.Repeat([]byte("Z"), msgLen)

// Send 2 msgs and stop, check for presence of our full state file.
fs.StoreMsg(subj, nil, msgA, 0)
fs.StoreMsg(subj, nil, msgZ, 0)
require_True(t, fs.numMsgBlocks() == 1)
_, _, err = fs.StoreMsg(subj, nil, msgA, 0)
require_NoError(t, err)
_, _, err = fs.StoreMsg(subj, nil, msgZ, 0)
require_NoError(t, err)
require_Equal(t, fs.numMsgBlocks(), 1)

// Make sure there is a full state file after we do a stop.
fs.Stop()
require_NoError(t, fs.Stop())

sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
if _, err := os.Stat(sfile); err != nil {
Expand All @@ -5317,10 +5319,10 @@ func TestFileStoreFullStateBasics(t *testing.T) {
// Make sure there are no old idx or fss files.
matches, err := filepath.Glob(filepath.Join(fcfg.StoreDir, msgDir, "%d.fss"))
require_NoError(t, err)
require_True(t, len(matches) == 0)
require_Equal(t, len(matches), 0)
matches, err = filepath.Glob(filepath.Join(fcfg.StoreDir, msgDir, "%d.idx"))
require_NoError(t, err)
require_True(t, len(matches) == 0)
require_Equal(t, len(matches), 0)

state := fs.State()
require_Equal(t, state.Msgs, 2)
Expand All @@ -5338,10 +5340,11 @@ func TestFileStoreFullStateBasics(t *testing.T) {
require_True(t, bytes.Equal(sm.msg, msgZ))

// Now add in 1 more here to split the lmb.
fs.StoreMsg(subj, nil, msgZ, 0)
_, _, err = fs.StoreMsg(subj, nil, msgZ, 0)
require_NoError(t, err)

// Now stop the filestore and replace the old stream state and make sure we recover correctly.
fs.Stop()
require_NoError(t, fs.Stop())

// Regrab the stream state
buf, err = os.ReadFile(sfile)
Expand All @@ -5352,8 +5355,9 @@ func TestFileStoreFullStateBasics(t *testing.T) {
defer fs.Stop()

// Add in one more.
fs.StoreMsg(subj, nil, msgZ, 0)
fs.Stop()
_, _, err = fs.StoreMsg(subj, nil, msgZ, 0)
require_NoError(t, err)
require_NoError(t, fs.Stop())

// Put old stream state back with only 3.
err = os.WriteFile(sfile, buf, defaultFilePerms)
Expand Down Expand Up @@ -5381,8 +5385,9 @@ func TestFileStoreFullStateBasics(t *testing.T) {
require_Equal(t, psi.lblk, 2)

// Store 1 more
fs.StoreMsg(subj, nil, msgA, 0)
fs.Stop()
_, _, err = fs.StoreMsg(subj, nil, msgA, 0)
require_NoError(t, err)
require_NoError(t, fs.Stop())
// Put old stream state back with only 3.
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)
Expand Down Expand Up @@ -8952,3 +8957,66 @@ func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) {
require_Equal(t, lseq, 1)
require_Len(t, dmaps, 0)
}

func TestFileStoreRecoverOnlyBlkFiles(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}
created := time.Now()
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

_, _, err = fs.StoreMsg("foo", nil, nil, 0)
require_NoError(t, err)

// Confirm state as baseline.
before := fs.State()
require_Equal(t, before.Msgs, 1)
require_Equal(t, before.FirstSeq, 1)
require_Equal(t, before.LastSeq, 1)

// Restart should equal state.
require_NoError(t, fs.Stop())
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

if state := fs.State(); !reflect.DeepEqual(state, before) {
t.Fatalf("Expected state of %+v, got %+v", before, state)
}

// Stream state should exist.
_, err = os.Stat(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
require_NoError(t, err)

// Stop and write some random files, but containing ".blk", should be ignored.
require_NoError(t, fs.Stop())
require_NoError(t, os.WriteFile(filepath.Join(fs.fcfg.StoreDir, msgDir, "10.blk.random"), nil, defaultFilePerms))
require_NoError(t, os.WriteFile(filepath.Join(fs.fcfg.StoreDir, msgDir, fmt.Sprintf("10.blk.%s", compressTmpSuffix)), nil, defaultFilePerms))

fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

// The random files would previously result in stream state to be deleted.
_, err = os.Stat(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
require_NoError(t, err)

if state := fs.State(); !reflect.DeepEqual(state, before) {
t.Fatalf("Expected state of %+v, got %+v", before, state)
}

// Stop and remove stream state file.
require_NoError(t, fs.Stop())
require_NoError(t, os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)))

// Recovering based on blocks should also ignore the random files.
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
require_NoError(t, err)
defer fs.Stop()

if state := fs.State(); !reflect.DeepEqual(state, before) {
t.Fatalf("Expected state of %+v, got %+v", before, state)
}
})
}