Skip to content

Commit 8db30d7

Browse files
[FIXED] Only recover .blk files in file store (#6684)
File store should only recover `%d.blk` files. If a file would be compressed there's temporarily a `%d.blk.tmp` file. Those temporary files would be recognized during recovery by `fmt.Sscanf`, which resulted in loading the same block file multiple times. Also de-flakes `TestFileStoreFullStateBasics` as it includes some tests that use compression. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 6658e68 + c242b57 commit 8db30d7

File tree

2 files changed

+90
-12
lines changed

2 files changed

+90
-12
lines changed

server/filestore.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ const (
289289
purgeDir = "__msgs__"
290290
// used to scan blk file names.
291291
blkScan = "%d.blk"
292+
// suffix of a block file
293+
blkSuffix = ".blk"
292294
// used for compacted blocks that are staged.
293295
newScan = "%d.new"
294296
// used to scan index file names.
@@ -1818,6 +1820,10 @@ func (fs *fileStore) recoverFullState() (rerr error) {
18181820

18191821
var index uint32
18201822
for _, fi := range dirs {
1823+
// Ensure it's actually a block file, otherwise fmt.Sscanf also matches %d.blk.tmp
1824+
if !strings.HasSuffix(fi.Name(), blkSuffix) {
1825+
continue
1826+
}
18211827
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
18221828
if index > blkIndex {
18231829
fs.warn("Stream state outdated, found extra blocks, will rebuild")
@@ -2006,6 +2012,10 @@ func (fs *fileStore) recoverMsgs() error {
20062012
indices := make(sort.IntSlice, 0, len(dirs))
20072013
var index int
20082014
for _, fi := range dirs {
2015+
// Ensure it's actually a block file, otherwise fmt.Sscanf also matches %d.blk.tmp
2016+
if !strings.HasSuffix(fi.Name(), blkSuffix) {
2017+
continue
2018+
}
20092019
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
20102020
indices = append(indices, index)
20112021
}

server/filestore_test.go

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5292,12 +5292,14 @@ func TestFileStoreFullStateBasics(t *testing.T) {
52925292
msgZ := bytes.Repeat([]byte("Z"), msgLen)
52935293

52945294
// Send 2 msgs and stop, check for presence of our full state file.
5295-
fs.StoreMsg(subj, nil, msgA, 0)
5296-
fs.StoreMsg(subj, nil, msgZ, 0)
5297-
require_True(t, fs.numMsgBlocks() == 1)
5295+
_, _, err = fs.StoreMsg(subj, nil, msgA, 0)
5296+
require_NoError(t, err)
5297+
_, _, err = fs.StoreMsg(subj, nil, msgZ, 0)
5298+
require_NoError(t, err)
5299+
require_Equal(t, fs.numMsgBlocks(), 1)
52985300

52995301
// Make sure there is a full state file after we do a stop.
5300-
fs.Stop()
5302+
require_NoError(t, fs.Stop())
53015303

53025304
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
53035305
if _, err := os.Stat(sfile); err != nil {
@@ -5317,10 +5319,10 @@ func TestFileStoreFullStateBasics(t *testing.T) {
53175319
// Make sure there are no old idx or fss files.
53185320
matches, err := filepath.Glob(filepath.Join(fcfg.StoreDir, msgDir, "%d.fss"))
53195321
require_NoError(t, err)
5320-
require_True(t, len(matches) == 0)
5322+
require_Equal(t, len(matches), 0)
53215323
matches, err = filepath.Glob(filepath.Join(fcfg.StoreDir, msgDir, "%d.idx"))
53225324
require_NoError(t, err)
5323-
require_True(t, len(matches) == 0)
5325+
require_Equal(t, len(matches), 0)
53245326

53255327
state := fs.State()
53265328
require_Equal(t, state.Msgs, 2)
@@ -5338,10 +5340,11 @@ func TestFileStoreFullStateBasics(t *testing.T) {
53385340
require_True(t, bytes.Equal(sm.msg, msgZ))
53395341

53405342
// Now add in 1 more here to split the lmb.
5341-
fs.StoreMsg(subj, nil, msgZ, 0)
5343+
_, _, err = fs.StoreMsg(subj, nil, msgZ, 0)
5344+
require_NoError(t, err)
53425345

53435346
// Now stop the filestore and replace the old stream state and make sure we recover correctly.
5344-
fs.Stop()
5347+
require_NoError(t, fs.Stop())
53455348

53465349
// Regrab the stream state
53475350
buf, err = os.ReadFile(sfile)
@@ -5352,8 +5355,9 @@ func TestFileStoreFullStateBasics(t *testing.T) {
53525355
defer fs.Stop()
53535356

53545357
// Add in one more.
5355-
fs.StoreMsg(subj, nil, msgZ, 0)
5356-
fs.Stop()
5358+
_, _, err = fs.StoreMsg(subj, nil, msgZ, 0)
5359+
require_NoError(t, err)
5360+
require_NoError(t, fs.Stop())
53575361

53585362
// Put old stream state back with only 3.
53595363
err = os.WriteFile(sfile, buf, defaultFilePerms)
@@ -5381,8 +5385,9 @@ func TestFileStoreFullStateBasics(t *testing.T) {
53815385
require_Equal(t, psi.lblk, 2)
53825386

53835387
// Store 1 more
5384-
fs.StoreMsg(subj, nil, msgA, 0)
5385-
fs.Stop()
5388+
_, _, err = fs.StoreMsg(subj, nil, msgA, 0)
5389+
require_NoError(t, err)
5390+
require_NoError(t, fs.Stop())
53865391
// Put old stream state back with only 3.
53875392
err = os.WriteFile(sfile, buf, defaultFilePerms)
53885393
require_NoError(t, err)
@@ -8952,3 +8957,66 @@ func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) {
89528957
require_Equal(t, lseq, 1)
89538958
require_Len(t, dmaps, 0)
89548959
}
8960+
8961+
func TestFileStoreRecoverOnlyBlkFiles(t *testing.T) {
8962+
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
8963+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}
8964+
created := time.Now()
8965+
fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
8966+
require_NoError(t, err)
8967+
defer fs.Stop()
8968+
8969+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
8970+
require_NoError(t, err)
8971+
8972+
// Confirm state as baseline.
8973+
before := fs.State()
8974+
require_Equal(t, before.Msgs, 1)
8975+
require_Equal(t, before.FirstSeq, 1)
8976+
require_Equal(t, before.LastSeq, 1)
8977+
8978+
// Restart should equal state.
8979+
require_NoError(t, fs.Stop())
8980+
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
8981+
require_NoError(t, err)
8982+
defer fs.Stop()
8983+
8984+
if state := fs.State(); !reflect.DeepEqual(state, before) {
8985+
t.Fatalf("Expected state of %+v, got %+v", before, state)
8986+
}
8987+
8988+
// Stream state should exist.
8989+
_, err = os.Stat(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
8990+
require_NoError(t, err)
8991+
8992+
// Stop and write some random files, but containing ".blk", should be ignored.
8993+
require_NoError(t, fs.Stop())
8994+
require_NoError(t, os.WriteFile(filepath.Join(fs.fcfg.StoreDir, msgDir, "10.blk.random"), nil, defaultFilePerms))
8995+
require_NoError(t, os.WriteFile(filepath.Join(fs.fcfg.StoreDir, msgDir, fmt.Sprintf("10.blk.%s", compressTmpSuffix)), nil, defaultFilePerms))
8996+
8997+
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
8998+
require_NoError(t, err)
8999+
defer fs.Stop()
9000+
9001+
// The random files would previously result in stream state to be deleted.
9002+
_, err = os.Stat(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
9003+
require_NoError(t, err)
9004+
9005+
if state := fs.State(); !reflect.DeepEqual(state, before) {
9006+
t.Fatalf("Expected state of %+v, got %+v", before, state)
9007+
}
9008+
9009+
// Stop and remove stream state file.
9010+
require_NoError(t, fs.Stop())
9011+
require_NoError(t, os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)))
9012+
9013+
// Recovering based on blocks should also ignore the random files.
9014+
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)
9015+
require_NoError(t, err)
9016+
defer fs.Stop()
9017+
9018+
if state := fs.State(); !reflect.DeepEqual(state, before) {
9019+
t.Fatalf("Expected state of %+v, got %+v", before, state)
9020+
}
9021+
})
9022+
}

0 commit comments

Comments
 (0)