diff --git a/server/filestore.go b/server/filestore.go index 392e71351d8..5ad6442ac3d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -648,6 +648,13 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { return err } + // Create or delete the THW if needed. + if cfg.AllowMsgTTL && fs.ttls == nil { + fs.ttls = thw.NewHashWheel() + } else if !cfg.AllowMsgTTL && fs.ttls != nil { + fs.ttls = nil + } + // Limits checks and enforcement. fs.enforceMsgLimit() fs.enforceBytesLimit() @@ -666,7 +673,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { } fs.mu.Unlock() - if cfg.MaxAge != 0 { + if cfg.MaxAge != 0 || cfg.AllowMsgTTL { fs.expireMsgs() } return nil diff --git a/server/filestore_test.go b/server/filestore_test.go index 78ccf4861ab..22592e3f8e4 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9607,3 +9607,23 @@ func TestFileStoreAccessTimeSpinUp(t *testing.T) { ngra := runtime.NumGoroutine() require_Equal(t, ngr, ngra) } + +func TestFileStoreUpdateConfigTTLState(t *testing.T) { + cfg := StreamConfig{ + Name: "zzz", + Subjects: []string{">"}, + Storage: FileStorage, + } + fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg) + require_NoError(t, err) + defer fs.Stop() + require_Equal(t, fs.ttls, nil) + + cfg.AllowMsgTTL = true + require_NoError(t, fs.UpdateConfig(&cfg)) + require_NotEqual(t, fs.ttls, nil) + + cfg.AllowMsgTTL = false + require_NoError(t, fs.UpdateConfig(&cfg)) + require_Equal(t, fs.ttls, nil) +} diff --git a/server/memstore.go b/server/memstore.go index 4da9adff467..581edb5e562 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -83,6 +83,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { ms.mu.Lock() ms.cfg = *cfg + // Create or delete the THW if needed. + if cfg.AllowMsgTTL && ms.ttls == nil { + ms.ttls = thw.NewHashWheel() + } else if !cfg.AllowMsgTTL && ms.ttls != nil { + ms.ttls = nil + } // Limits checks and enforcement. ms.enforceMsgLimit() ms.enforceBytesLimit() @@ -112,7 +118,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error { } ms.mu.Unlock() - if cfg.MaxAge != 0 { + if cfg.MaxAge != 0 || cfg.AllowMsgTTL { ms.expireMsgs() } return nil diff --git a/server/memstore_test.go b/server/memstore_test.go index 77ab3a2c22c..2f95ee3eb8b 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1292,6 +1292,26 @@ func TestMemStoreAllLastSeqs(t *testing.T) { require_True(t, reflect.DeepEqual(seqs, expected)) } +func TestMemStoreUpdateConfigTTLState(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{">"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + require_Equal(t, ms.ttls, nil) + + cfg.AllowMsgTTL = true + require_NoError(t, ms.UpdateConfig(cfg)) + require_NotEqual(t, ms.ttls, nil) + + cfg.AllowMsgTTL = false + require_NoError(t, ms.UpdateConfig(cfg)) + require_Equal(t, ms.ttls, nil) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/store_test.go b/server/store_test.go index 05f09993811..32d567481aa 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -18,6 +18,7 @@ package server import ( "fmt" "testing" + "time" ) func testAllStoreAllPermutations(t *testing.T, compressionAndEncryption bool, cfg StreamConfig, fn func(t *testing.T, fs StreamStore)) { @@ -535,3 +536,50 @@ func TestStorePurgeExZero(t *testing.T) { }, ) } + +func TestStoreUpdateConfigTTLState(t *testing.T) { + config := func() StreamConfig { + return StreamConfig{Name: "TEST", Subjects: []string{"foo"}} + } + testAllStoreAllPermutations( + t, false, config(), + func(t *testing.T, fs StreamStore) { + cfg := config() + switch fs.(type) { + case *fileStore: + cfg.Storage = FileStorage + case *memStore: + cfg.Storage = MemoryStorage + } + + // TTLs disabled at this point so this message should survive. + seq, _, err := fs.StoreMsg("foo", nil, nil, 1) + require_NoError(t, err) + time.Sleep(2 * time.Second) + _, err = fs.LoadMsg(seq, nil) + require_NoError(t, err) + + // Now enable TTLs. + cfg.AllowMsgTTL = true + require_NoError(t, fs.UpdateConfig(&cfg)) + + // TTLs enabled at this point so this message should be cleaned up. + seq, _, err = fs.StoreMsg("foo", nil, nil, 1) + require_NoError(t, err) + time.Sleep(2 * time.Second) + _, err = fs.LoadMsg(seq, nil) + require_Error(t, err) + + // Now disable TTLs again. + cfg.AllowMsgTTL = false + require_NoError(t, fs.UpdateConfig(&cfg)) + + // TTLs disabled again so this message should survive. + seq, _, err = fs.StoreMsg("foo", nil, nil, 1) + require_NoError(t, err) + time.Sleep(2 * time.Second) + _, err = fs.LoadMsg(seq, nil) + require_NoError(t, err) + }, + ) +}