Skip to content

Correctly handle updates to AllowMsgTTL on streams #6922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,13 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
return err
}

// Create or delete the THW if needed.
if cfg.AllowMsgTTL && fs.ttls == nil {
fs.ttls = thw.NewHashWheel()
} else if !cfg.AllowMsgTTL && fs.ttls != nil {
fs.ttls = nil
}

// Limits checks and enforcement.
fs.enforceMsgLimit()
fs.enforceBytesLimit()
Expand All @@ -666,7 +673,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
}
fs.mu.Unlock()

if cfg.MaxAge != 0 {
if cfg.MaxAge != 0 || cfg.AllowMsgTTL {
fs.expireMsgs()
}
return nil
Expand Down
20 changes: 20 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9607,3 +9607,23 @@ func TestFileStoreAccessTimeSpinUp(t *testing.T) {
ngra := runtime.NumGoroutine()
require_Equal(t, ngr, ngra)
}

func TestFileStoreUpdateConfigTTLState(t *testing.T) {
cfg := StreamConfig{
Name: "zzz",
Subjects: []string{">"},
Storage: FileStorage,
}
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg)
require_NoError(t, err)
defer fs.Stop()
require_Equal(t, fs.ttls, nil)

cfg.AllowMsgTTL = true
require_NoError(t, fs.UpdateConfig(&cfg))
require_NotEqual(t, fs.ttls, nil)

cfg.AllowMsgTTL = false
require_NoError(t, fs.UpdateConfig(&cfg))
require_Equal(t, fs.ttls, nil)
}
8 changes: 7 additions & 1 deletion server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {

ms.mu.Lock()
ms.cfg = *cfg
// Create or delete the THW if needed.
if cfg.AllowMsgTTL && ms.ttls == nil {
ms.ttls = thw.NewHashWheel()
} else if !cfg.AllowMsgTTL && ms.ttls != nil {
ms.ttls = nil
}
// Limits checks and enforcement.
ms.enforceMsgLimit()
ms.enforceBytesLimit()
Expand Down Expand Up @@ -112,7 +118,7 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
}
ms.mu.Unlock()

if cfg.MaxAge != 0 {
if cfg.MaxAge != 0 || cfg.AllowMsgTTL {
ms.expireMsgs()
}
return nil
Expand Down
20 changes: 20 additions & 0 deletions server/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,26 @@ func TestMemStoreAllLastSeqs(t *testing.T) {
require_True(t, reflect.DeepEqual(seqs, expected))
}

func TestMemStoreUpdateConfigTTLState(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Subjects: []string{">"},
Storage: MemoryStorage,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
defer ms.Stop()
require_Equal(t, ms.ttls, nil)

cfg.AllowMsgTTL = true
require_NoError(t, ms.UpdateConfig(cfg))
require_NotEqual(t, ms.ttls, nil)

cfg.AllowMsgTTL = false
require_NoError(t, ms.UpdateConfig(cfg))
require_Equal(t, ms.ttls, nil)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
48 changes: 48 additions & 0 deletions server/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server
import (
"fmt"
"testing"
"time"
)

func testAllStoreAllPermutations(t *testing.T, compressionAndEncryption bool, cfg StreamConfig, fn func(t *testing.T, fs StreamStore)) {
Expand Down Expand Up @@ -535,3 +536,50 @@ func TestStorePurgeExZero(t *testing.T) {
},
)
}

func TestStoreUpdateConfigTTLState(t *testing.T) {
config := func() StreamConfig {
return StreamConfig{Name: "TEST", Subjects: []string{"foo"}}
}
testAllStoreAllPermutations(
t, false, config(),
func(t *testing.T, fs StreamStore) {
cfg := config()
switch fs.(type) {
case *fileStore:
cfg.Storage = FileStorage
case *memStore:
cfg.Storage = MemoryStorage
}

// TTLs disabled at this point so this message should survive.
seq, _, err := fs.StoreMsg("foo", nil, nil, 1)
require_NoError(t, err)
time.Sleep(2 * time.Second)
_, err = fs.LoadMsg(seq, nil)
require_NoError(t, err)

// Now enable TTLs.
cfg.AllowMsgTTL = true
require_NoError(t, fs.UpdateConfig(&cfg))

// TTLs enabled at this point so this message should be cleaned up.
seq, _, err = fs.StoreMsg("foo", nil, nil, 1)
require_NoError(t, err)
time.Sleep(2 * time.Second)
_, err = fs.LoadMsg(seq, nil)
require_Error(t, err)

// Now disable TTLs again.
cfg.AllowMsgTTL = false
require_NoError(t, fs.UpdateConfig(&cfg))

// TTLs disabled again so this message should survive.
seq, _, err = fs.StoreMsg("foo", nil, nil, 1)
require_NoError(t, err)
time.Sleep(2 * time.Second)
_, err = fs.LoadMsg(seq, nil)
require_NoError(t, err)
},
)
}
Loading