Skip to content

Commit f70721d

Browse files
De-flake setup of mem WQ restart test
Signed-off-by: Maurice van Veen <[email protected]>
1 parent d323e23 commit f70721d

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

server/norace_test.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10729,27 +10729,42 @@ func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *t
1072910729
nc, js := jsClientConnect(t, c.randomServer())
1073010730
defer nc.Close()
1073110731

10732-
_, err := js.AddStream(&nats.StreamConfig{
10733-
Name: fmt.Sprintf("TEST:%d", n),
10734-
Storage: nats.MemoryStorage,
10735-
Retention: nats.WorkQueuePolicy,
10736-
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
10737-
Replicas: 3,
10738-
}, nats.MaxWait(30*time.Second))
10739-
require_NoError(t, err)
10732+
checkFor(t, 5*time.Second, time.Second, func() error {
10733+
_, err := js.AddStream(&nats.StreamConfig{
10734+
Name: fmt.Sprintf("TEST:%d", n),
10735+
Storage: nats.MemoryStorage,
10736+
Retention: nats.WorkQueuePolicy,
10737+
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
10738+
Replicas: 3,
10739+
}, nats.MaxWait(time.Second))
10740+
return err
10741+
})
10742+
1074010743
subj := fmt.Sprintf("foo.%d.bar", n)
1074110744
for i := 0; i < 22; i++ {
10742-
js.Publish(subj, nil)
10745+
checkFor(t, 5*time.Second, time.Second, func() error {
10746+
_, err := js.Publish(subj, nil)
10747+
return err
10748+
})
1074310749
}
10744-
// Now consumer them all as well.
10745-
sub, err := js.PullSubscribe(subj, "wq")
10746-
require_NoError(t, err)
10747-
msgs, err := sub.Fetch(22, nats.MaxWait(20*time.Second))
10748-
require_NoError(t, err)
10750+
// Now consume them all as well.
10751+
var err error
10752+
var sub *nats.Subscription
10753+
checkFor(t, 5*time.Second, time.Second, func() error {
10754+
sub, err = js.PullSubscribe(subj, "wq")
10755+
return err
10756+
})
10757+
10758+
var msgs []*nats.Msg
10759+
checkFor(t, 5*time.Second, time.Second, func() error {
10760+
msgs, err = sub.Fetch(22, nats.MaxWait(time.Second))
10761+
return err
10762+
})
1074910763
require_Equal(t, len(msgs), 22)
1075010764
for _, m := range msgs {
10751-
err := m.AckSync()
10752-
require_NoError(t, err)
10765+
checkFor(t, 5*time.Second, time.Second, func() error {
10766+
return m.AckSync()
10767+
})
1075310768
}
1075410769
}(i)
1075510770
}

0 commit comments

Comments
 (0)