diff --git a/server/filestore.go b/server/filestore.go index df1ce61c434..e62ed5b7b1a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -475,18 +475,20 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } // Check if our prior state remembers a last sequence past where we can see. - if fs.ld != nil && prior.LastSeq > fs.state.LastSeq { + if prior.LastSeq > fs.state.LastSeq { fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime if fs.state.Msgs == 0 { fs.state.FirstSeq = fs.state.LastSeq + 1 fs.state.FirstTime = time.Time{} } - if _, err := fs.newMsgBlockForWrite(); err == nil { - if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil { + if fs.ld != nil { + if _, err := fs.newMsgBlockForWrite(); err == nil { + if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil { + return nil, err + } + } else { return nil, err } - } else { - return nil, err } } // Since we recovered here, make sure to kick ourselves to write out our stream state. diff --git a/server/filestore_test.go b/server/filestore_test.go index 74665610afd..f3aede141d2 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9555,3 +9555,33 @@ func TestFileStoreAllLastSeqs(t *testing.T) { require_NoError(t, err) require_True(t, reflect.DeepEqual(seqs, expected)) } + +func TestFileStoreRecoverDoesNotResetStreamState(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 5 * time.Second, Retention: WorkQueuePolicy} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + subj, msg := "foo", []byte("Hello World") + toStore := 500 + for i := 0; i < toStore; i++ { + _, _, err := fs.StoreMsg(subj, nil, msg, 0) + require_NoError(t, err) + } + time.Sleep(5 * time.Second) + fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil) //Expire all messages so stream does not hold any message, this is to simulate consumer consuming all messages. + require_NoError(t, err) + require_NoError(t, fs.Stop()) //To Ensure there is a state file created + require_True(t, len(fs.blks) == 1) //Since all messages are expire there should be only 1 blk file exist + os.Remove(fs.blks[0].mfn) // we can change it to have a consumer and consumer all messages too, but removing blk files will simulate same behavior + + //Now at this point stream has only index.db file and no blk files as all are deleted. previously it used to reset the stream state to 0 + // now it will use index.db to populate stream state if could not be recovered from blk files. + fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil) + require_NoError(t, err) + require_True(t, fs.state.FirstSeq|fs.state.LastSeq != 0) + +}