Skip to content

Commit 519943f

Browse files
Add support for "never expires" TTL (#6319)
A message with `Nats-TTL: never` will never expire, even if the `MaxAge` policy is set. Signed-off-by: Neil Twigg <[email protected]>
2 parents 2ff42be + e1e8b64 commit 519943f

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

server/filestore.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5304,7 +5304,15 @@ func (fs *fileStore) expireMsgs() {
53045304
fs.mu.RUnlock()
53055305

53065306
if maxAge > 0 {
5307-
for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
5307+
var seq uint64
5308+
for sm, seq, _ = fs.LoadNextMsg(fwcs, true, 0, &smv); sm != nil && sm.ts <= minAge; sm, seq, _ = fs.LoadNextMsg(fwcs, true, seq+1, &smv) {
5309+
if len(sm.hdr) > 0 {
5310+
if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 {
5311+
// The message has a negative TTL, therefore it must "never expire".
5312+
minAge = time.Now().UnixNano() - maxAge
5313+
continue
5314+
}
5315+
}
53085316
fs.mu.Lock()
53095317
fs.removeMsgViaLimits(sm.seq)
53105318
fs.mu.Unlock()

server/jetstream_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25016,3 +25016,51 @@ func TestJetStreamMessageTTLNotUpdatable(t *testing.T) {
2501625016
})
2501725017
require_Error(t, err)
2501825018
}
25019+
25020+
func TestJetStreamMessageTTLNeverExpire(t *testing.T) {
25021+
s := RunBasicJetStreamServer(t)
25022+
defer s.Shutdown()
25023+
25024+
nc, js := jsClientConnect(t, s)
25025+
defer nc.Close()
25026+
25027+
jsStreamCreate(t, nc, &StreamConfig{
25028+
Name: "TEST",
25029+
Storage: FileStorage,
25030+
Subjects: []string{"test"},
25031+
AllowMsgTTL: true,
25032+
MaxAge: time.Second,
25033+
})
25034+
25035+
msg := &nats.Msg{
25036+
Subject: "test",
25037+
Header: nats.Header{},
25038+
}
25039+
25040+
// The first message we publish is set to "never expire", therefore it
25041+
// won't age out with the MaxAge policy.
25042+
msg.Header.Set("Nats-TTL", "never")
25043+
_, err := js.PublishMsg(msg)
25044+
require_NoError(t, err)
25045+
25046+
// Following messages will be published as normal and will age out.
25047+
msg.Header.Del("Nats-TTL")
25048+
for i := 1; i <= 10; i++ {
25049+
_, err := js.PublishMsg(msg)
25050+
require_NoError(t, err)
25051+
}
25052+
25053+
si, err := js.StreamInfo("TEST")
25054+
require_NoError(t, err)
25055+
require_Equal(t, si.State.Msgs, 11)
25056+
require_Equal(t, si.State.FirstSeq, 1)
25057+
require_Equal(t, si.State.LastSeq, 11)
25058+
25059+
time.Sleep(time.Second * 2)
25060+
25061+
si, err = js.StreamInfo("TEST")
25062+
require_NoError(t, err)
25063+
require_Equal(t, si.State.Msgs, 1)
25064+
require_Equal(t, si.State.FirstSeq, 1)
25065+
require_Equal(t, si.State.LastSeq, 11)
25066+
}

server/stream.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4220,12 +4220,16 @@ func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
42204220
// Fast lookup of the message TTL:
42214221
// - Positive return value: duration in seconds.
42224222
// - Zero return value: no TTL or parse error.
4223+
// - Negative return value: never expires.
42234224
func getMessageTTL(hdr []byte) (int64, error) {
42244225
ttl := getHeader(JSMessageTTL, hdr)
42254226
if len(ttl) == 0 {
42264227
return 0, nil
42274228
}
42284229
sttl := bytesToString(ttl)
4230+
if strings.ToLower(sttl) == "never" {
4231+
return -1, nil
4232+
}
42294233
dur, err := time.ParseDuration(sttl)
42304234
if err == nil {
42314235
if dur < time.Second {
@@ -4235,6 +4239,10 @@ func getMessageTTL(hdr []byte) (int64, error) {
42354239
}
42364240
t := parseInt64(ttl)
42374241
if t < 0 {
4242+
// This probably means a parse failure, hence why
4243+
// we have a special case "never" for returning -1.
4244+
// Otherwise we can't know if it's a genuine TTL
4245+
// that says never expire or if it's a parse error.
42384246
return 0, NewJSMessageTTLInvalidError()
42394247
}
42404248
return t, nil

0 commit comments

Comments
 (0)