Skip to content

Commit 6177130

Browse files
[FIXED] Clear all pre-acks for seq upon removing message (#6325)
If a message in a stream was removed/erased/purged the pre-acks would not be cleared for that sequence. This meant that pre-acks could remain and potentially result in a memory leak. There was also a call to `mset.clearAllPreAcks(last)` where the write lock was not held. Updated `mset.setLastSeq` to require a write lock to already be held, so we don't need to keep locking/unlocking multiple times. Also de-flakes `TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers` due to: - remaining pre-acks that should have been cleared - messages being removed asynchronously due to going through proposals (added `checkFor`) Signed-off-by: Maurice van Veen <[email protected]>
2 parents d3bcbfc + 6a0f758 commit 6177130

File tree

4 files changed

+168
-46
lines changed

4 files changed

+168
-46
lines changed

server/jetstream_cluster.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3098,8 +3098,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
30983098
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
30993099
// Skip and update our lseq.
31003100
last := mset.store.SkipMsg()
3101+
mset.mu.Lock()
31013102
mset.setLastSeq(last)
31023103
mset.clearAllPreAcks(last)
3104+
mset.mu.Unlock()
31033105
continue
31043106
}
31053107

@@ -8805,18 +8807,18 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
88058807
return 0, err
88068808
}
88078809

8810+
mset.mu.Lock()
8811+
defer mset.mu.Unlock()
88088812
// Update our lseq.
88098813
mset.setLastSeq(seq)
88108814

88118815
// Check for MsgId and if we have one here make sure to update our internal map.
88128816
if len(hdr) > 0 {
88138817
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
88148818
if !ddloaded {
8815-
mset.mu.Lock()
88168819
mset.rebuildDedupe()
8817-
mset.mu.Unlock()
88188820
}
8819-
mset.storeMsgId(&ddentry{msgId, seq, ts})
8821+
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
88208822
}
88218823
}
88228824

server/jetstream_cluster_1_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6999,6 +6999,99 @@ func TestJetStreamClusterStreamUpscalePeersAfterDownscale(t *testing.T) {
69996999
checkPeerSet()
70007000
}
70017001

7002+
func TestJetStreamClusterClearAllPreAcksOnRemoveMsg(t *testing.T) {
7003+
c := createJetStreamClusterExplicit(t, "R3S", 3)
7004+
defer c.shutdown()
7005+
7006+
nc, js := jsClientConnect(t, c.randomServer())
7007+
defer nc.Close()
7008+
7009+
_, err := js.AddStream(&nats.StreamConfig{
7010+
Name: "TEST",
7011+
Subjects: []string{"foo"},
7012+
Replicas: 3,
7013+
Retention: nats.WorkQueuePolicy,
7014+
})
7015+
require_NoError(t, err)
7016+
7017+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
7018+
Durable: "CONSUMER",
7019+
AckPolicy: nats.AckExplicitPolicy,
7020+
})
7021+
require_NoError(t, err)
7022+
7023+
for i := 0; i < 3; i++ {
7024+
_, err = js.Publish("foo", nil)
7025+
require_NoError(t, err)
7026+
}
7027+
7028+
// Wait for all servers to converge on the same state.
7029+
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
7030+
return checkState(t, c, globalAccountName, "TEST")
7031+
})
7032+
7033+
// Register pre-acks on all servers.
7034+
// Normally this can't happen as the stream leader will have the message that's acked available, just for testing.
7035+
for _, s := range c.servers {
7036+
acc, err := s.lookupAccount(globalAccountName)
7037+
require_NoError(t, err)
7038+
mset, err := acc.lookupStream("TEST")
7039+
require_NoError(t, err)
7040+
o := mset.lookupConsumer("CONSUMER")
7041+
require_NotNil(t, o)
7042+
7043+
// Register pre-acks for the 3 messages.
7044+
mset.registerPreAckLock(o, 1)
7045+
mset.registerPreAckLock(o, 2)
7046+
mset.registerPreAckLock(o, 3)
7047+
}
7048+
7049+
// Check there's an expected amount of pre-acks, and there are no pre-acks for the given sequence.
7050+
checkPreAcks := func(seq uint64, expected int) {
7051+
t.Helper()
7052+
checkFor(t, 5*time.Second, time.Second, func() error {
7053+
for _, s := range c.servers {
7054+
acc, err := s.lookupAccount(globalAccountName)
7055+
if err != nil {
7056+
return err
7057+
}
7058+
mset, err := acc.lookupStream("TEST")
7059+
if err != nil {
7060+
return err
7061+
}
7062+
mset.mu.RLock()
7063+
numPreAcks := len(mset.preAcks)
7064+
numSeqPreAcks := len(mset.preAcks[seq])
7065+
mset.mu.RUnlock()
7066+
if numPreAcks != expected {
7067+
return fmt.Errorf("expected %d pre-acks, got %d", expected, numPreAcks)
7068+
}
7069+
if seq > 0 && numSeqPreAcks != 0 {
7070+
return fmt.Errorf("expected 0 pre-acks for seq %d, got %d", seq, numSeqPreAcks)
7071+
}
7072+
}
7073+
return nil
7074+
})
7075+
}
7076+
// Check all pre-acks were registered.
7077+
checkPreAcks(0, 3)
7078+
7079+
// Deleting the message should clear the pre-ack.
7080+
err = js.DeleteMsg("TEST", 1)
7081+
require_NoError(t, err)
7082+
checkPreAcks(1, 2)
7083+
7084+
// Erasing the message should clear the pre-ack.
7085+
err = js.SecureDeleteMsg("TEST", 2)
7086+
require_NoError(t, err)
7087+
checkPreAcks(2, 1)
7088+
7089+
// Purging should clear all pre-acks below the purged floor.
7090+
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 4})
7091+
require_NoError(t, err)
7092+
checkPreAcks(3, 0)
7093+
}
7094+
70027095
//
70037096
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
70047097
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/norace_test.go

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7738,32 +7738,47 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T)
77387738
// make sure we do not remove prematurely.
77397739
msgs, err := sub.Fetch(100, nats.MaxWait(time.Second))
77407740
require_NoError(t, err)
7741-
require_True(t, len(msgs) == 100)
7741+
require_Len(t, len(msgs), 100)
77427742
for _, m := range msgs {
77437743
m.AckSync()
77447744
}
77457745

77467746
ci, err := js.ConsumerInfo("EVENTS", "D")
77477747
require_NoError(t, err)
7748-
require_True(t, ci.NumPending == uint64(numToSend-100))
7749-
require_True(t, ci.NumAckPending == 0)
7750-
require_True(t, ci.Delivered.Stream == 100)
7751-
require_True(t, ci.AckFloor.Stream == 100)
7748+
require_Equal(t, ci.NumPending, uint64(numToSend-100))
7749+
require_Equal(t, ci.NumAckPending, 0)
7750+
require_Equal(t, ci.Delivered.Stream, 100)
7751+
require_Equal(t, ci.AckFloor.Stream, 100)
77527752

77537753
// Check stream state on all servers.
7754-
for _, s := range c.servers {
7755-
mset, err := s.GlobalAccount().lookupStream("EVENTS")
7756-
require_NoError(t, err)
7757-
state := mset.state()
7758-
require_True(t, state.Msgs == 900)
7759-
require_True(t, state.FirstSeq == 101)
7760-
require_True(t, state.LastSeq == 1000)
7761-
require_True(t, state.Consumers == 2)
7762-
}
7754+
// Since acks result in messages to be removed through proposals,
7755+
// it could take some time to be reflected in the stream state.
7756+
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
7757+
for _, s := range c.servers {
7758+
mset, err := s.GlobalAccount().lookupStream("EVENTS")
7759+
if err != nil {
7760+
return err
7761+
}
7762+
state := mset.state()
7763+
if state.Msgs != 900 {
7764+
return fmt.Errorf("expected state.Msgs=900, got %d", state.Msgs)
7765+
}
7766+
if state.FirstSeq != 101 {
7767+
return fmt.Errorf("expected state.FirstSeq=101, got %d", state.FirstSeq)
7768+
}
7769+
if state.LastSeq != 1000 {
7770+
return fmt.Errorf("expected state.LastSeq=1000, got %d", state.LastSeq)
7771+
}
7772+
if state.Consumers != 2 {
7773+
return fmt.Errorf("expected state.Consumers=2, got %d", state.Consumers)
7774+
}
7775+
}
7776+
return nil
7777+
})
77637778

77647779
msgs, err = sub.Fetch(900, nats.MaxWait(time.Second))
77657780
require_NoError(t, err)
7766-
require_True(t, len(msgs) == 900)
7781+
require_Len(t, len(msgs), 900)
77677782
for _, m := range msgs {
77687783
m.AckSync()
77697784
}
@@ -7776,15 +7791,15 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T)
77767791
mset, err := s.GlobalAccount().lookupStream("EVENTS")
77777792
require_NoError(t, err)
77787793
state := mset.state()
7779-
require_True(t, state.Msgs == 0)
7780-
require_True(t, state.FirstSeq == 1001)
7781-
require_True(t, state.LastSeq == 1000)
7782-
require_True(t, state.Consumers == 2)
7794+
require_Equal(t, state.Msgs, 0)
7795+
require_Equal(t, state.FirstSeq, 1001)
7796+
require_Equal(t, state.LastSeq, 1000)
7797+
require_Equal(t, state.Consumers, 2)
77837798
// Now check preAcks
77847799
mset.mu.RLock()
77857800
numPreAcks := len(mset.preAcks)
77867801
mset.mu.RUnlock()
7787-
require_True(t, numPreAcks == 0)
7802+
require_Len(t, numPreAcks, 0)
77887803
}
77897804
}
77907805

@@ -7872,27 +7887,27 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleFilteredConsumers(t *te
78727887

78737888
ci, err := js.ConsumerInfo("EVENTS", "D")
78747889
require_NoError(t, err)
7875-
require_True(t, ci.NumPending == 0)
7876-
require_True(t, ci.NumAckPending == 0)
7877-
require_True(t, ci.Delivered.Consumer == 500)
7878-
require_True(t, ci.Delivered.Stream == 1000)
7879-
require_True(t, ci.AckFloor.Consumer == 500)
7880-
require_True(t, ci.AckFloor.Stream == 1000)
7890+
require_Equal(t, ci.NumPending, 0)
7891+
require_Equal(t, ci.NumAckPending, 0)
7892+
require_Equal(t, ci.Delivered.Consumer, 500)
7893+
require_Equal(t, ci.Delivered.Stream, 1000)
7894+
require_Equal(t, ci.AckFloor.Consumer, 500)
7895+
require_Equal(t, ci.AckFloor.Stream, 1000)
78817896

78827897
// Check final stream state on all servers.
78837898
for _, s := range c.servers {
78847899
mset, err := s.GlobalAccount().lookupStream("EVENTS")
78857900
require_NoError(t, err)
78867901
state := mset.state()
7887-
require_True(t, state.Msgs == 0)
7888-
require_True(t, state.FirstSeq == 1001)
7889-
require_True(t, state.LastSeq == 1000)
7890-
require_True(t, state.Consumers == 2)
7902+
require_Equal(t, state.Msgs, 0)
7903+
require_Equal(t, state.FirstSeq, 1001)
7904+
require_Equal(t, state.LastSeq, 1000)
7905+
require_Equal(t, state.Consumers, 2)
78917906
// Now check preAcks
78927907
mset.mu.RLock()
78937908
numPreAcks := len(mset.preAcks)
78947909
mset.mu.RUnlock()
7895-
require_True(t, numPreAcks == 0)
7910+
require_Len(t, numPreAcks, 0)
78967911
}
78977912
}
78987913

server/stream.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,10 +1126,10 @@ func (mset *stream) lastSeq() uint64 {
11261126
return mset.lseq
11271127
}
11281128

1129+
// Set last seq.
1130+
// Write lock should be held.
11291131
func (mset *stream) setLastSeq(lseq uint64) {
1130-
mset.mu.Lock()
11311132
mset.lseq = lseq
1132-
mset.mu.Unlock()
11331133
}
11341134

11351135
func (mset *stream) sendCreateAdvisory() {
@@ -2188,11 +2188,16 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
21882188
store.FastState(&state)
21892189
fseq, lseq := state.FirstSeq, state.LastSeq
21902190

2191+
mset.mu.Lock()
21912192
// Check if our last has moved past what our original last sequence was, if so reset.
21922193
if lseq > mlseq {
21932194
mset.setLastSeq(lseq)
21942195
}
21952196

2197+
// Clear any pending acks below first seq.
2198+
mset.clearAllPreAcksBelowFloor(fseq)
2199+
mset.mu.Unlock()
2200+
21962201
// Purge consumers.
21972202
// Check for filtered purge.
21982203
if preq != nil && preq.Subject != _EMPTY_ {
@@ -2239,15 +2244,29 @@ func (mset *stream) deleteMsg(seq uint64) (bool, error) {
22392244
if mset.closed.Load() {
22402245
return false, errStreamClosed
22412246
}
2242-
return mset.store.RemoveMsg(seq)
2247+
removed, err := mset.store.RemoveMsg(seq)
2248+
if err != nil {
2249+
return removed, err
2250+
}
2251+
mset.mu.Lock()
2252+
mset.clearAllPreAcks(seq)
2253+
mset.mu.Unlock()
2254+
return removed, err
22432255
}
22442256

22452257
// EraseMsg will securely remove a message and rewrite the data with random data.
22462258
func (mset *stream) eraseMsg(seq uint64) (bool, error) {
22472259
if mset.closed.Load() {
22482260
return false, errStreamClosed
22492261
}
2250-
return mset.store.EraseMsg(seq)
2262+
removed, err := mset.store.EraseMsg(seq)
2263+
if err != nil {
2264+
return removed, err
2265+
}
2266+
mset.mu.Lock()
2267+
mset.clearAllPreAcks(seq)
2268+
mset.mu.Unlock()
2269+
return removed, err
22512270
}
22522271

22532272
// Are we a mirror?
@@ -4138,15 +4157,8 @@ func (mset *stream) purgeMsgIds() {
41384157
}
41394158
}
41404159

4141-
// storeMsgId will store the message id for duplicate detection.
4142-
func (mset *stream) storeMsgId(dde *ddentry) {
4143-
mset.mu.Lock()
4144-
defer mset.mu.Unlock()
4145-
mset.storeMsgIdLocked(dde)
4146-
}
4147-
41484160
// storeMsgIdLocked will store the message id for duplicate detection.
4149-
// Lock should he held.
4161+
// Lock should be held.
41504162
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
41514163
if mset.ddmap == nil {
41524164
mset.ddmap = make(map[string]*ddentry)

0 commit comments

Comments
 (0)