Skip to content

Commit 3ce469a

Browse files
committed
Avoid filestore corruption by rejectinglarge publishes into JetStream
We have an effective and seemingly undocumented limit of 32MB for messages published into JetStream as `indexCacheBuf` in the filestore rejects any records that exceed this length. We did not catch this on publish however, and then `indexCacheBuf` would later error when consuming etc. Signed-off-by: Neil Twigg <[email protected]>
1 parent c449a96 commit 3ce469a

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

server/filestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5976,7 +5976,7 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
59765976

59775977
// Get size for this message.
59785978
rl := fileStoreMsgSize(subj, hdr, msg)
5979-
if rl&hbit != 0 {
5979+
if rl&hbit != 0 || rl > rlBadThresh {
59805980
return 0, ErrMsgTooLarge
59815981
}
59825982
// Grab our current last message block.

server/jetstream_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19755,3 +19755,35 @@ func TestJetStreamTHWExpireTasksRace(t *testing.T) {
1975519755
})
1975619756
}
1975719757
}
19758+
19759+
func TestJetStreamRejectLargePublishes(t *testing.T) {
19760+
tdir := t.TempDir()
19761+
19762+
// The test relies on the MaxPayload being larger than the
19763+
// rlBadThresh, otherwise you can't publish a message large
19764+
// enough.
19765+
conf := createConfFile(t, []byte(fmt.Sprintf(`
19766+
listen: 127.0.0.1:-1
19767+
max_payload: %d
19768+
jetstream: {store_dir: %q}
19769+
`, rlBadThresh+2048, tdir)))
19770+
19771+
s, _ := RunServerWithConfig(conf)
19772+
defer s.Shutdown()
19773+
19774+
nc, js := jsClientConnect(t, s)
19775+
defer nc.Close()
19776+
19777+
_, err := js.AddStream(&nats.StreamConfig{
19778+
Name: "TEST",
19779+
Subjects: []string{"test"},
19780+
})
19781+
require_NoError(t, err)
19782+
19783+
_, err = js.Publish("test", make([]byte, rlBadThresh-1024))
19784+
require_NoError(t, err)
19785+
19786+
_, err = js.Publish("test", make([]byte, rlBadThresh+1024))
19787+
require_Error(t, err)
19788+
require_Contains(t, err.Error(), ErrMsgTooLarge.Error())
19789+
}

0 commit comments

Comments
 (0)