Skip to content

[FIXED] workqueue reset to 0 when blk file is zero-sized due to unflushed data on crash #6882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,18 +475,20 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}

// Check if our prior state remembers a last sequence past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
if prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if fs.state.Msgs == 0 {
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.FirstTime = time.Time{}
}
if _, err := fs.newMsgBlockForWrite(); err == nil {
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
if fs.ld != nil {
if _, err := fs.newMsgBlockForWrite(); err == nil {
if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil {
return nil, err
}
} else {
return nil, err
}
} else {
return nil, err
}
}
// Since we recovered here, make sure to kick ourselves to write out our stream state.
Expand Down
30 changes: 30 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9555,3 +9555,33 @@ func TestFileStoreAllLastSeqs(t *testing.T) {
require_NoError(t, err)
require_True(t, reflect.DeepEqual(seqs, expected))
}

func TestFileStoreRecoverDoesNotResetStreamState(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 5 * time.Second, Retention: WorkQueuePolicy}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

subj, msg := "foo", []byte("Hello World")
toStore := 500
for i := 0; i < toStore; i++ {
_, _, err := fs.StoreMsg(subj, nil, msg, 0)
require_NoError(t, err)
}
time.Sleep(5 * time.Second)
fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil) //Expire all messages so stream does not hold any message, this is to simulate consumer consuming all messages.
require_NoError(t, err)
require_NoError(t, fs.Stop()) //To Ensure there is a state file created
require_True(t, len(fs.blks) == 1) //Since all messages are expire there should be only 1 blk file exist
os.Remove(fs.blks[0].mfn) // we can change it to have a consumer and consumer all messages too, but removing blk files will simulate same behavior

//Now at this point stream has only index.db file and no blk files as all are deleted. previously it used to reset the stream state to 0
// now it will use index.db to populate stream state if could not be recovered from blk files.
fs, err = newFileStoreWithCreated(fs.fcfg, cfg, time.Now(), prf(&fs.fcfg), nil)
require_NoError(t, err)
require_True(t, fs.state.FirstSeq|fs.state.LastSeq != 0)

}