diff --git a/server/norace_test.go b/server/norace_test.go index f8dd3de9c0c..8cb4e1f7e9c 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8517,13 +8517,16 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} // S1 - conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") + // The route connection to S2 must be through a slow proxy + np12 := createNetProxy(10*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:15622", true) + routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np12.routeURL()) + conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, routes) c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) // S2 - // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. - np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) - routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) + // The route connection to S1 must be through a slow proxy + np21 := createNetProxy(10*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) + routes = fmt.Sprintf("%s, route://127.0.0.1:16622", np21.routeURL()) conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) @@ -8534,13 +8537,21 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. c.checkClusterFormed() c.waitOnClusterReady() defer c.shutdown() - defer np.stop() + defer np12.stop() + defer np21.stop() - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() + slow := c.servers[0] // Expecting pre-acks here. + sl := c.servers[1] // Stream leader, will publish here. + cl := c.servers[2] // Consumer leader, will consume & ack here. + + snc, sjs := jsClientConnect(t, sl) + defer snc.Close() + + cnc, cjs := jsClientConnect(t, cl) + defer cnc.Close() // Now create the stream. - _, err := js.AddStream(&nats.StreamConfig{ + _, err := sjs.AddStream(&nats.StreamConfig{ Name: "EVENTS", Subjects: []string{"EV.>"}, Replicas: 3, @@ -8549,7 +8560,6 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. require_NoError(t, err) // Make sure it's leader is on S2. - sl := c.servers[1] checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnStreamLeader(globalAccountName, "EVENTS") if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { @@ -8560,7 +8570,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. }) // Now create the consumer. - _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ + _, err = sjs.AddConsumer("EVENTS", &nats.ConsumerConfig{ Durable: "C", AckPolicy: nats.AckExplicitPolicy, DeliverSubject: "dx", @@ -8568,7 +8578,6 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. require_NoError(t, err) // Make sure the consumer leader is on S3. - cl := c.servers[2] checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { @@ -8578,37 +8587,36 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. return nil }) - // Create the real consumer on the consumer leader to make it efficient. - nc, js = jsClientConnect(t, cl) - defer nc.Close() - - _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + _, err = cjs.Subscribe(_EMPTY_, func(msg *nats.Msg) { msg.Ack() }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) require_NoError(t, err) + // Publish directly on the stream leader to make it efficient. for i := 0; i < 1_000; i++ { - _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) + _, err := sjs.PublishAsync("EV.PAID", []byte("ok")) require_NoError(t, err) } select { - case <-js.PublishAsyncComplete(): + case <-sjs.PublishAsyncComplete(): case <-time.After(5 * time.Second): t.Fatalf("Did not receive completion signal") } - slow := c.servers[0] mset, err := slow.GlobalAccount().lookupStream("EVENTS") require_NoError(t, err) - // Make sure preAck is non-nil, so we know the logic has kicked in. - mset.mu.RLock() - preAcks := mset.preAcks - mset.mu.RUnlock() - require_NotNil(t, preAcks) - - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { state := mset.state() + if state.LastSeq != 1000 { + return fmt.Errorf("Haven't received all messages yet (last seq %d)", state.LastSeq) + } + mset.mu.RLock() + preAcks := mset.preAcks + mset.mu.RUnlock() + if preAcks == nil { + return fmt.Errorf("Expected to have preAcks by now") + } if state.Msgs == 0 { mset.mu.RLock() lp := len(mset.preAcks) diff --git a/server/test_test.go b/server/test_test.go index f3003b1edb2..7985389d144 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -19,6 +19,7 @@ import ( "math/rand" "net/url" "os" + "reflect" "strings" "testing" "time" @@ -75,10 +76,15 @@ func require_NoError(t testing.TB, err error) { } } -func require_NotNil(t testing.TB, v any) { +func require_NotNil[T any](t testing.TB, v T) { t.Helper() - if v == nil { - t.Fatalf("require not nil, but got: %v", v) + r := reflect.ValueOf(v) + switch k := r.Kind(); k { + case reflect.Ptr, reflect.Interface, reflect.Slice, + reflect.Map, reflect.Chan, reflect.Func: + if r.IsNil() { + t.Fatalf("require not nil, but got nil") + } } }