Skip to content

Commit 852bfc7

Browse files
committed
Updates based on PR feedback
Signed-off-by: Derek Collison <[email protected]>
1 parent ce76f67 commit 852bfc7

File tree

3 files changed

+29
-163
lines changed

3 files changed

+29
-163
lines changed

server/filestore_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3670,6 +3670,12 @@ func TestFileStorePurgeExWithSubject(t *testing.T) {
36703670
// Make sure we have our state file prior to Purge call.
36713671
fs.forceWriteFullState()
36723672

3673+
// Capture the current index.db file.
3674+
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
3675+
buf, err := os.ReadFile(sfile)
3676+
require_NoError(t, err)
3677+
require_True(t, len(buf) > 0)
3678+
36733679
// This should purge all "foo.1"
36743680
p, err := fs.PurgeEx("foo.1", 1, 0)
36753681
require_NoError(t, err)
@@ -3679,12 +3685,6 @@ func TestFileStorePurgeExWithSubject(t *testing.T) {
36793685
require_Equal(t, state.Msgs, 2)
36803686
require_Equal(t, state.FirstSeq, 1)
36813687

3682-
// Capture the current index.db file if it exists.
3683-
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
3684-
buf, err := os.ReadFile(sfile)
3685-
require_NoError(t, err)
3686-
require_True(t, len(buf) > 0)
3687-
36883688
// Make sure we can recover same state.
36893689
fs.Stop()
36903690
fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil)

server/memstore.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,23 @@ func (ms *memStore) AllLastSeqs() ([]uint64, error) {
656656
return seqs, nil
657657
}
658658

659+
// Helper to determine if the filter(s) represent all the subjects.
660+
// Most clients send in subjects even if they match the stream's ingest subjects.
661+
// Lock should be held.
662+
func (ms *memStore) filterIsAll(filters []string) bool {
663+
if len(filters) != len(ms.cfg.Subjects) {
664+
return false
665+
}
666+
// Sort so we can compare.
667+
slices.Sort(filters)
668+
for i, subj := range filters {
669+
if !subjectIsSubsetMatch(ms.cfg.Subjects[i], subj) {
670+
return false
671+
}
672+
}
673+
return true
674+
}
675+
659676
// MultiLastSeqs will return a sorted list of sequences that match all subjects presented in filters.
660677
// We will not exceed the maxSeq, which if 0 becomes the store's last sequence.
661678
func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error) {
@@ -666,6 +683,11 @@ func (ms *memStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed in
666683
return nil, nil
667684
}
668685

686+
// See if we can short circuit if we think they are asking for all last sequences and have no maxSeq or maxAllowed set.
687+
if maxSeq == 0 && maxAllowed <= 0 && ms.filterIsAll(filters) {
688+
return ms.AllLastSeqs()
689+
}
690+
669691
// Implied last sequence.
670692
if maxSeq == 0 {
671693
maxSeq = ms.state.LastSeq

server/memstore_test.go

Lines changed: 1 addition & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,7 @@ func TestMemStoreSkipMsgs(t *testing.T) {
921921
func TestMemStoreMultiLastSeqs(t *testing.T) {
922922
cfg := &StreamConfig{
923923
Name: "zzz",
924-
Subjects: []string{"foo.*"},
924+
Subjects: []string{"foo.*", "bar.*"},
925925
Storage: MemoryStorage,
926926
}
927927
ms, err := newMemStore(cfg)
@@ -1259,162 +1259,6 @@ func TestMemStoreSubjectDeleteMarkers(t *testing.T) {
12591259
require_Equal(t, bytesToString(getHeader(JSMessageTTL, im.hdr)), "1s")
12601260
}
12611261

1262-
func TestMemStoreSubjectDeleteMarkersOnPurge(t *testing.T) {
1263-
t.SkipNow()
1264-
1265-
ms, err := newMemStore(
1266-
&StreamConfig{
1267-
Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage,
1268-
MaxAge: time.Second, AllowMsgTTL: true,
1269-
SubjectDeleteMarkerTTL: time.Second,
1270-
},
1271-
)
1272-
require_NoError(t, err)
1273-
defer ms.Stop()
1274-
1275-
for i := 0; i < 10; i++ {
1276-
_, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0)
1277-
require_NoError(t, err)
1278-
}
1279-
1280-
_, err = ms.Purge()
1281-
require_NoError(t, err)
1282-
1283-
for i := uint64(0); i < 10; i++ {
1284-
sm, err := ms.LoadMsg(11+i, nil)
1285-
require_NoError(t, err)
1286-
require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i))
1287-
require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge)
1288-
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
1289-
}
1290-
}
1291-
1292-
func TestMemStoreSubjectDeleteMarkersOnPurgeEx(t *testing.T) {
1293-
t.SkipNow()
1294-
1295-
ms, err := newMemStore(
1296-
&StreamConfig{
1297-
Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage,
1298-
MaxAge: time.Second, AllowMsgTTL: true,
1299-
SubjectDeleteMarkerTTL: time.Second,
1300-
},
1301-
)
1302-
require_NoError(t, err)
1303-
defer ms.Stop()
1304-
1305-
for i := 0; i < 10; i++ {
1306-
_, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0)
1307-
require_NoError(t, err)
1308-
}
1309-
1310-
_, err = ms.PurgeEx("test.*", 1, 0)
1311-
require_NoError(t, err)
1312-
1313-
for i := uint64(0); i < 10; i++ {
1314-
sm, err := ms.LoadMsg(11+i, nil)
1315-
require_NoError(t, err)
1316-
require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i))
1317-
require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge)
1318-
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
1319-
}
1320-
}
1321-
1322-
func TestMemStoreSubjectDeleteMarkersOnPurgeExNoMarkers(t *testing.T) {
1323-
t.SkipNow()
1324-
1325-
ms, err := newMemStore(
1326-
&StreamConfig{
1327-
Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage,
1328-
MaxAge: time.Second, AllowMsgTTL: true,
1329-
SubjectDeleteMarkerTTL: time.Second,
1330-
},
1331-
)
1332-
require_NoError(t, err)
1333-
defer ms.Stop()
1334-
1335-
for i := 0; i < 10; i++ {
1336-
_, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0)
1337-
require_NoError(t, err)
1338-
}
1339-
1340-
_, err = ms.PurgeEx("test.*", 1, 0)
1341-
require_NoError(t, err)
1342-
1343-
for i := uint64(0); i < 10; i++ {
1344-
_, err := ms.LoadMsg(11+i, nil)
1345-
require_Error(t, err)
1346-
}
1347-
}
1348-
1349-
func TestMemStoreSubjectDeleteMarkersOnCompact(t *testing.T) {
1350-
t.SkipNow()
1351-
1352-
ms, err := newMemStore(
1353-
&StreamConfig{
1354-
Name: "zzz", Subjects: []string{"test.*"}, Storage: MemoryStorage,
1355-
MaxAge: time.Second, AllowMsgTTL: true,
1356-
SubjectDeleteMarkerTTL: time.Second,
1357-
},
1358-
)
1359-
require_NoError(t, err)
1360-
defer ms.Stop()
1361-
1362-
for i := 0; i < 10; i++ {
1363-
_, _, err := ms.StoreMsg(fmt.Sprintf("test.%d", i), nil, nil, 0)
1364-
require_NoError(t, err)
1365-
}
1366-
1367-
_, err = ms.Compact(6)
1368-
require_NoError(t, err)
1369-
1370-
for i := uint64(6); i <= 15; i++ {
1371-
sm, err := ms.LoadMsg(i, nil)
1372-
require_NoError(t, err)
1373-
if i <= 10 {
1374-
require_Equal(t, sm.subj, fmt.Sprintf("test.%d", i-1))
1375-
require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), _EMPTY_)
1376-
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), _EMPTY_)
1377-
} else {
1378-
require_Equal(t, sm.subj, fmt.Sprintf("test.%d", 15-i))
1379-
require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonPurge)
1380-
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
1381-
}
1382-
}
1383-
}
1384-
1385-
func TestMemStoreSubjectDeleteMarkersOnRemoveMsg(t *testing.T) {
1386-
t.SkipNow()
1387-
1388-
ms, err := newMemStore(
1389-
&StreamConfig{
1390-
Name: "zzz", Subjects: []string{"test"}, Storage: MemoryStorage,
1391-
MaxAge: time.Second, AllowMsgTTL: true,
1392-
SubjectDeleteMarkerTTL: time.Second,
1393-
},
1394-
)
1395-
require_NoError(t, err)
1396-
defer ms.Stop()
1397-
1398-
_, _, err = ms.StoreMsg("test", nil, nil, 0)
1399-
require_NoError(t, err)
1400-
1401-
_, err = ms.RemoveMsg(1)
1402-
require_NoError(t, err)
1403-
1404-
sm, err := ms.LoadMsg(2, nil)
1405-
require_NoError(t, err)
1406-
require_Equal(t, sm.subj, "test")
1407-
require_Equal(t, bytesToString(getHeader(JSMarkerReason, sm.hdr)), JSMarkerReasonRemove)
1408-
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
1409-
1410-
_, err = ms.RemoveMsg(2)
1411-
require_NoError(t, err)
1412-
1413-
// The deleted subject marker at seq 2 should not have been replaced.
1414-
_, err = ms.LoadMsg(3, nil)
1415-
require_Error(t, err)
1416-
}
1417-
14181262
func TestMemStoreAllLastSeqs(t *testing.T) {
14191263
cfg := &StreamConfig{
14201264
Name: "zzz",

0 commit comments

Comments
 (0)