Skip to content

Commit 19b7cc6

Browse files
derekcollisonMauriceVanVeen
authored andcommitted
[FIXED] Subject state consistency (#6226)
Subject state would not always remain consistent given a specific pattern of message removals. There were three bugs: - `recalculateFirstForSubj` in memstore would do `startSeq+1`, but filestore would always just start at `mb.first.seq`. These are now consistent. - `recalculateFirstForSubj` was not called when `ss.Msgs == 1`, which could mean we had a stale `ss.FirstSeq` if it needed to be recalculated. - If after recalculation it turns out `ss.FirstSeq` equals the message we're trying to remove, we need to `recalculateFirstForSubj` again, since `ss.Last` is also lazy and could be incorrect. Apart from that, filestore and memstore are now both equivalent when it comes to first updating per-subject state and then removing the message, as well as `removeSeqPerSubject` and how it updates the subject state. Signed-off-by: Maurice van Veen <[email protected]>
1 parent bf90cfb commit 19b7cc6

File tree

3 files changed

+130
-19
lines changed

3 files changed

+130
-19
lines changed

server/filestore.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2616,10 +2616,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
26162616
// Always reset.
26172617
ss.First, ss.Last, ss.Msgs = 0, 0, 0
26182618

2619-
if filter == _EMPTY_ {
2620-
filter = fwcs
2621-
}
2622-
26232619
// We do need to figure out the first and last sequences.
26242620
wc := subjectHasWildcard(filter)
26252621
start, stop := uint32(math.MaxUint32), uint32(0)
@@ -7391,6 +7387,9 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
73917387
// Update fss
73927388
smb.removeSeqPerSubject(sm.subj, mseq)
73937389
fs.removePerSubject(sm.subj)
7390+
// Need to mark the sequence as deleted. Otherwise, recalculating ss.First
7391+
// for per-subject info would be able to find it still.
7392+
smb.dmap.Insert(mseq)
73947393
}
73957394
}
73967395

@@ -7838,11 +7837,16 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
78387837

78397838
// Only one left.
78407839
if ss.Msgs == 1 {
7841-
if seq == ss.Last {
7842-
ss.Last = ss.First
7843-
} else {
7844-
ss.First = ss.Last
7840+
// Update first if we need to, we must check if this removal is about what's going to be ss.First
7841+
if ss.firstNeedsUpdate {
7842+
mb.recalculateFirstForSubj(subj, ss.First, ss)
78457843
}
7844+
// If we're removing the first message, we must recalculate again.
7845+
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
7846+
if ss.First == seq {
7847+
mb.recalculateFirstForSubj(subj, ss.First, ss)
7848+
}
7849+
ss.Last = ss.First
78467850
ss.firstNeedsUpdate = false
78477851
return
78487852
}
@@ -7872,8 +7876,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
78727876
startSlot = 0
78737877
}
78747878

7879+
fseq := startSeq + 1
7880+
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
7881+
fseq = mbFseq
7882+
}
78757883
var le = binary.LittleEndian
7876-
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
7884+
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
78777885
bi := mb.cache.idx[slot] &^ hbit
78787886
if bi == dbit {
78797887
// delete marker so skip.

server/memstore.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,8 +1009,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
10091009
if sm := ms.msgs[seq]; sm != nil {
10101010
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
10111011
purged++
1012-
delete(ms.msgs, seq)
10131012
ms.removeSeqPerSubject(sm.subj, seq)
1013+
// Must delete message after updating per-subject info, to be consistent with file store.
1014+
delete(ms.msgs, seq)
10141015
}
10151016
}
10161017
if purged > ms.state.Msgs {
@@ -1098,8 +1099,9 @@ func (ms *memStore) Truncate(seq uint64) error {
10981099
if sm := ms.msgs[i]; sm != nil {
10991100
purged++
11001101
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
1101-
delete(ms.msgs, i)
11021102
ms.removeSeqPerSubject(sm.subj, i)
1103+
// Must delete message after updating per-subject info, to be consistent with file store.
1104+
delete(ms.msgs, i)
11031105
}
11041106
}
11051107
// Reset last.
@@ -1360,17 +1362,24 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
13601362
}
13611363
ss.Msgs--
13621364

1363-
// If we know we only have 1 msg left don't need to search for next first.
1365+
// Only one left.
13641366
if ss.Msgs == 1 {
1365-
if seq == ss.Last {
1366-
ss.Last = ss.First
1367-
} else {
1368-
ss.First = ss.Last
1367+
// Update first if we need to, we must check if this removal is about what's going to be ss.First
1368+
if ss.firstNeedsUpdate {
1369+
ms.recalculateFirstForSubj(subj, ss.First, ss)
13691370
}
1371+
// If we're removing the first message, we must recalculate again.
1372+
// ss.Last is lazy as well, so need to calculate new ss.First and set ss.Last to it.
1373+
if ss.First == seq {
1374+
ms.recalculateFirstForSubj(subj, ss.First, ss)
1375+
}
1376+
ss.Last = ss.First
13701377
ss.firstNeedsUpdate = false
1371-
} else {
1372-
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
1378+
return
13731379
}
1380+
1381+
// We can lazily calculate the first sequence when needed.
1382+
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
13741383
}
13751384

13761385
// Will recalculate the first sequence for this subject in this block.
@@ -1403,7 +1412,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
14031412

14041413
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
14051414

1406-
delete(ms.msgs, seq)
14071415
if ms.state.Msgs > 0 {
14081416
ms.state.Msgs--
14091417
if ss > ms.state.Bytes {
@@ -1428,6 +1436,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
14281436

14291437
// Remove any per subject tracking.
14301438
ms.removeSeqPerSubject(sm.subj, seq)
1439+
// Must delete message after updating per-subject info, to be consistent with file store.
1440+
delete(ms.msgs, seq)
14311441

14321442
if ms.scb != nil {
14331443
// We do not want to hold any locks here.

server/store_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,96 @@ func TestStoreMaxMsgsPerUpdateBug(t *testing.T) {
185185
},
186186
)
187187
}
188+
189+
func TestStoreSubjectStateConsistency(t *testing.T) {
190+
testAllStoreAllPermutations(
191+
t, false,
192+
StreamConfig{Name: "TEST", Subjects: []string{"foo"}},
193+
func(t *testing.T, fs StreamStore) {
194+
getSubjectState := func() SimpleState {
195+
t.Helper()
196+
ss := fs.SubjectsState("foo")
197+
return ss["foo"]
198+
}
199+
200+
// Publish an initial batch of messages.
201+
for i := 0; i < 4; i++ {
202+
_, _, err := fs.StoreMsg("foo", nil, nil)
203+
require_NoError(t, err)
204+
}
205+
// Expect 4 msgs, with first=1, last=4.
206+
ss := getSubjectState()
207+
require_Equal(t, ss.Msgs, 4)
208+
require_Equal(t, ss.First, 1)
209+
require_Equal(t, ss.Last, 4)
210+
211+
// Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate.
212+
removed, err := fs.RemoveMsg(1)
213+
require_NoError(t, err)
214+
require_True(t, removed)
215+
216+
// Will update first, so corrects to seq 2.
217+
ss = getSubjectState()
218+
require_Equal(t, ss.Msgs, 3)
219+
require_Equal(t, ss.First, 2)
220+
require_Equal(t, ss.Last, 4)
221+
222+
// Remove last message.
223+
removed, err = fs.RemoveMsg(4)
224+
require_NoError(t, err)
225+
require_True(t, removed)
226+
227+
// ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases.
228+
ss = getSubjectState()
229+
require_Equal(t, ss.Msgs, 2)
230+
require_Equal(t, ss.First, 2)
231+
require_Equal(t, ss.Last, 4)
232+
233+
// Remove first message again.
234+
removed, err = fs.RemoveMsg(2)
235+
require_NoError(t, err)
236+
require_True(t, removed)
237+
238+
// Since we only have one message left, must update ss.First and set ss.Last to equal.
239+
ss = getSubjectState()
240+
require_Equal(t, ss.Msgs, 1)
241+
require_Equal(t, ss.First, 3)
242+
require_Equal(t, ss.Last, 3)
243+
244+
// Publish some more messages so we can test another scenario.
245+
for i := 0; i < 3; i++ {
246+
_, _, err := fs.StoreMsg("foo", nil, nil)
247+
require_NoError(t, err)
248+
}
249+
250+
// Just check the state is complete again.
251+
ss = getSubjectState()
252+
require_Equal(t, ss.Msgs, 4)
253+
require_Equal(t, ss.First, 3)
254+
require_Equal(t, ss.Last, 7)
255+
256+
// Remove last sequence, ss.Last is lazy so doesn't get updated.
257+
removed, err = fs.RemoveMsg(7)
258+
require_NoError(t, err)
259+
require_True(t, removed)
260+
261+
// Remove first sequence, ss.First is lazy so doesn't get updated.
262+
removed, err = fs.RemoveMsg(3)
263+
require_NoError(t, err)
264+
require_True(t, removed)
265+
266+
// Remove (now) first sequence, but because ss.First is lazy we first need to recalculate
267+
// to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First
268+
// yet again, since ss.Last is lazy and is not correct.
269+
removed, err = fs.RemoveMsg(5)
270+
require_NoError(t, err)
271+
require_True(t, removed)
272+
273+
// ss.First should equal ss.Last, last should have been updated now.
274+
ss = getSubjectState()
275+
require_Equal(t, ss.Msgs, 1)
276+
require_Equal(t, ss.First, 6)
277+
require_Equal(t, ss.Last, 6)
278+
},
279+
)
280+
}

0 commit comments

Comments
 (0)