Skip to content

Commit 1d0bdfb

Browse files
Cherry-pick de-flakes for 2.10.25-RC.1 (#6349)
Deponds on #6345 Includes the following de-flakes: - #6329 - #6330 - #6331 - #6332 - #6334 And this data race fix: - #6150 Signed-off-by: Maurice van Veen <[email protected]>
2 parents 3b6b9c8 + d10e9f7 commit 1d0bdfb

File tree

5 files changed

+49
-9
lines changed

5 files changed

+49
-9
lines changed

server/jetstream_cluster_2_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4219,7 +4219,7 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) {
42194219
d := tr.Sub(start)
42204220
// Adjust start for next calcs.
42214221
start = start.Add(d)
4222-
if d < expected[i] || d > expected[i]*2 {
4222+
if d < expected[i]-5*time.Millisecond || d > expected[i]*2+5*time.Millisecond {
42234223
t.Fatalf("Timing is off for %d, expected ~%v, but got %v", i, expected[i], d)
42244224
}
42254225
}
@@ -6238,8 +6238,12 @@ func TestJetStreamClusterStreamResetOnExpirationDuringPeerDownAndRestartWithLead
62386238

62396239
// Wait for all messages to expire.
62406240
checkFor(t, 5*time.Second, time.Second, func() error {
6241-
si, err := js.StreamInfo("TEST")
6242-
require_NoError(t, err)
6241+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
6242+
defer cancel()
6243+
si, err := js.StreamInfo("TEST", nats.Context(ctx))
6244+
if err != nil {
6245+
return err
6246+
}
62436247
if si.State.Msgs == 0 {
62446248
return nil
62456249
}
@@ -6277,9 +6281,11 @@ func TestJetStreamClusterStreamResetOnExpirationDuringPeerDownAndRestartWithLead
62776281
}
62786282

62796283
// Now move the leader there and double check, but above test is sufficient.
6280-
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
6284+
checkFor(t, 30*time.Second, 250*time.Millisecond, func() error {
62816285
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
6282-
require_NoError(t, err)
6286+
if err != nil {
6287+
return err
6288+
}
62836289
c.waitOnStreamLeader("$G", "TEST")
62846290
if c.streamLeader("$G", "TEST") == nsl {
62856291
return nil

server/jetstream_cluster_4_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3603,7 +3603,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
36033603
for _, n := range server.raftNodes {
36043604
rn := n.(*raft)
36053605
if rn.accName == "$G" {
3606+
rn.Lock()
36063607
rn.updateLeader(noLeader)
3608+
rn.Unlock()
36073609
}
36083610
}
36093611

server/jetstream_helpers_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,27 @@ func (sc *supercluster) waitOnLeader() {
556556
sc.t.Fatalf("Expected a cluster leader, got none")
557557
}
558558

559+
func (sc *supercluster) waitOnAccount(account string) {
560+
sc.t.Helper()
561+
expires := time.Now().Add(40 * time.Second)
562+
for time.Now().Before(expires) {
563+
found := true
564+
for _, c := range sc.clusters {
565+
for _, s := range c.servers {
566+
acc, err := s.fetchAccount(account)
567+
found = found && err == nil && acc != nil
568+
}
569+
}
570+
if found {
571+
return
572+
}
573+
time.Sleep(100 * time.Millisecond)
574+
continue
575+
}
576+
577+
sc.t.Fatalf("Expected account %q to exist but didn't", account)
578+
}
579+
559580
func (sc *supercluster) waitOnAllCurrent() {
560581
sc.t.Helper()
561582
for _, c := range sc.clusters {

server/jetstream_jwt_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ func TestJetStreamJWTMove(t *testing.T) {
325325
require_False(t, s.JetStreamEnabled())
326326
updateJwt(t, s.ClientURL(), sysCreds, accJwt, 10)
327327

328+
sc.waitOnAccount(aExpPub)
329+
328330
s = sc.serverByName("C2-S1")
329331
require_False(t, s.JetStreamEnabled())
330332

@@ -609,6 +611,8 @@ func TestJetStreamJWTClusteredTiersChange(t *testing.T) {
609611
updateJwt(t, c.randomServer().ClientURL(), sysCreds, sysJwt, 3)
610612
updateJwt(t, c.randomServer().ClientURL(), sysCreds, accJwt1, 3)
611613

614+
c.waitOnAccount(aExpPub)
615+
612616
nc := natsConnect(t, c.randomServer().ClientURL(), nats.UserCredentials(accCreds))
613617
defer nc.Close()
614618

@@ -693,6 +697,8 @@ func TestJetStreamJWTClusteredDeleteTierWithStreamAndMove(t *testing.T) {
693697
updateJwt(t, c.randomServer().ClientURL(), sysCreds, sysJwt, 3)
694698
updateJwt(t, c.randomServer().ClientURL(), sysCreds, accJwt1, 3)
695699

700+
c.waitOnAccount(aExpPub)
701+
696702
nc := natsConnect(t, c.randomServer().ClientURL(), nats.UserCredentials(accCreds))
697703
defer nc.Close()
698704

server/jetstream_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8536,10 +8536,14 @@ func TestJetStreamNextMsgNoInterest(t *testing.T) {
85368536
}
85378537
}
85388538
nc.Flush()
8539-
ostate := o.info()
8540-
if ostate.AckFloor.Stream != 11 || ostate.NumAckPending > 0 {
8541-
t.Fatalf("Inconsistent ack state: %+v", ostate)
8542-
}
8539+
8540+
checkFor(t, time.Second, 10*time.Millisecond, func() error {
8541+
ostate := o.info()
8542+
if ostate.AckFloor.Stream != 11 || ostate.NumAckPending > 0 {
8543+
return fmt.Errorf("Inconsistent ack state: %+v", ostate)
8544+
}
8545+
return nil
8546+
})
85438547
})
85448548
}
85458549
}
@@ -23018,6 +23022,7 @@ func TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg(t *testing.T) {
2301823022
o := mset.lookupConsumer("CONSUMER")
2301923023

2302023024
requireExpected := func(expected int64) {
23025+
t.Helper()
2302123026
checkFor(t, time.Second, 10*time.Millisecond, func() error {
2302223027
o.mu.RLock()
2302323028
npc := o.npc

0 commit comments

Comments
 (0)