From bd5af582d8002e940bb99a6d6086bf548f39006c Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 20 Nov 2024 11:02:23 +0000 Subject: [PATCH 1/2] Tests: Fix `require_NotNil` Previous behaviour of checking interface is `nil` didn't do the right thing as a `nil` parameter could still be wrapped in an interface descriptor that itself would be non-`nil`, so use reflection instead. Signed-off-by: Neil Twigg --- server/test_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/test_test.go b/server/test_test.go index f3003b1edb2..bd7eb1daf91 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -19,6 +19,7 @@ import ( "math/rand" "net/url" "os" + "reflect" "strings" "testing" "time" @@ -75,10 +76,15 @@ func require_NoError(t testing.TB, err error) { } } -func require_NotNil(t testing.TB, v any) { +func require_NotNil[T any](t testing.TB, v T) { t.Helper() - if v == nil { - t.Fatalf("require not nil, but got: %v", v) + r := reflect.ValueOf(v) + switch k := r.Kind(); k { + case reflect.Ptr, reflect.Interface, reflect.Slice, + reflect.Map, reflect.Chan, reflect.Func: + if r.IsNil() { + t.Fatalf("require not nil, but got: %v", v) + } } } From 44c32ef139cfc52436b88ce608ed4d284d18f23a Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 21 Nov 2024 18:21:30 +0000 Subject: [PATCH 2/2] Fix `TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck` There were various things wrong with this test: 1. The routes between S1 and S2 were only slow in one direction 2. The stream listened for one subject and then the publisher published on another 3. The final `checkFor` didn't wait to see if all of the messages had even been processed before going on to check preack state Signed-off-by: Neil Twigg --- server/norace_test.go | 60 ++++++++++++++++++++++++------------------- server/test_test.go | 2 +- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index f8dd3de9c0c..8cb4e1f7e9c 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8517,13 +8517,16 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} // S1 - conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") + // The route connection to S2 must be through a slow proxy + np12 := createNetProxy(10*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:15622", true) + routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np12.routeURL()) + conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, routes) c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) // S2 - // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. - np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) - routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) + // The route connection to S1 must be through a slow proxy + np21 := createNetProxy(10*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) + routes = fmt.Sprintf("%s, route://127.0.0.1:16622", np21.routeURL()) conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) @@ -8534,13 +8537,21 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. c.checkClusterFormed() c.waitOnClusterReady() defer c.shutdown() - defer np.stop() + defer np12.stop() + defer np21.stop() - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() + slow := c.servers[0] // Expecting pre-acks here. + sl := c.servers[1] // Stream leader, will publish here. + cl := c.servers[2] // Consumer leader, will consume & ack here. + + snc, sjs := jsClientConnect(t, sl) + defer snc.Close() + + cnc, cjs := jsClientConnect(t, cl) + defer cnc.Close() // Now create the stream. - _, err := js.AddStream(&nats.StreamConfig{ + _, err := sjs.AddStream(&nats.StreamConfig{ Name: "EVENTS", Subjects: []string{"EV.>"}, Replicas: 3, @@ -8549,7 +8560,6 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. require_NoError(t, err) // Make sure it's leader is on S2. - sl := c.servers[1] checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnStreamLeader(globalAccountName, "EVENTS") if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { @@ -8560,7 +8570,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. }) // Now create the consumer. - _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ + _, err = sjs.AddConsumer("EVENTS", &nats.ConsumerConfig{ Durable: "C", AckPolicy: nats.AckExplicitPolicy, DeliverSubject: "dx", @@ -8568,7 +8578,6 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. require_NoError(t, err) // Make sure the consumer leader is on S3. - cl := c.servers[2] checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { @@ -8578,37 +8587,36 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. return nil }) - // Create the real consumer on the consumer leader to make it efficient. - nc, js = jsClientConnect(t, cl) - defer nc.Close() - - _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + _, err = cjs.Subscribe(_EMPTY_, func(msg *nats.Msg) { msg.Ack() }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) require_NoError(t, err) + // Publish directly on the stream leader to make it efficient. for i := 0; i < 1_000; i++ { - _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) + _, err := sjs.PublishAsync("EV.PAID", []byte("ok")) require_NoError(t, err) } select { - case <-js.PublishAsyncComplete(): + case <-sjs.PublishAsyncComplete(): case <-time.After(5 * time.Second): t.Fatalf("Did not receive completion signal") } - slow := c.servers[0] mset, err := slow.GlobalAccount().lookupStream("EVENTS") require_NoError(t, err) - // Make sure preAck is non-nil, so we know the logic has kicked in. - mset.mu.RLock() - preAcks := mset.preAcks - mset.mu.RUnlock() - require_NotNil(t, preAcks) - - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { state := mset.state() + if state.LastSeq != 1000 { + return fmt.Errorf("Haven't received all messages yet (last seq %d)", state.LastSeq) + } + mset.mu.RLock() + preAcks := mset.preAcks + mset.mu.RUnlock() + if preAcks == nil { + return fmt.Errorf("Expected to have preAcks by now") + } if state.Msgs == 0 { mset.mu.RLock() lp := len(mset.preAcks) diff --git a/server/test_test.go b/server/test_test.go index bd7eb1daf91..7985389d144 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -83,7 +83,7 @@ func require_NotNil[T any](t testing.TB, v T) { case reflect.Ptr, reflect.Interface, reflect.Slice, reflect.Map, reflect.Chan, reflect.Func: if r.IsNil() { - t.Fatalf("require not nil, but got: %v", v) + t.Fatalf("require not nil, but got nil") } } }