diff --git a/server/monitor.go b/server/monitor.go index 05214b999b4..6efe52a36cd 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -340,7 +340,17 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // Search by individual CID. if cid > 0 { - if state == ConnClosed || state == ConnAll { + // Let's first check if user also selects on ConnOpen or ConnAll + // and look for opened connections. + if state == ConnOpen || state == ConnAll { + if client := s.clients[cid]; client != nil { + openClients = append(openClients, client) + closedClients = nil + } + } + // If we did not find, and the user selected for ConnClosed or ConnAll, + // look for closed connections. + if len(openClients) == 0 && (state == ConnClosed || state == ConnAll) { copyClosed := closedClients closedClients = nil for _, cc := range copyClosed { @@ -349,11 +359,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { break } } - } else if state == ConnOpen || state == ConnAll { - client := s.clients[cid] - if client != nil { - openClients = append(openClients, client) - } } } else { // Gather all open clients. diff --git a/server/monitor_test.go b/server/monitor_test.go index a1c0d806d63..c5be3239d03 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1890,6 +1890,12 @@ func TestConnzWithStateForClosedConns(t *testing.T) { if lc := len(c.Conns); lc != 1 { return fmt.Errorf("Expected a connection in open array, got %d", lc) } + // It should also work if we ask for "state=all" + c = pollConz(t, s, mode, url+"connz?cid=2&state=all", &ConnzOptions{CID: 2, State: ConnAll}) + if lc := len(c.Conns); lc != 1 { + return fmt.Errorf("Expected a connection in open array, got %d", lc) + } + // But not for "state=closed" c = pollConz(t, s, mode, url+"connz?cid=2&state=closed", &ConnzOptions{CID: 2, State: ConnClosed}) if lc := len(c.Conns); lc != 0 { return fmt.Errorf("Expected no connections in closed array, got %d", lc) diff --git a/server/raft.go b/server/raft.go index 2b3aafb01c2..7c50dc943dc 100644 --- a/server/raft.go +++ b/server/raft.go @@ -177,10 +177,11 @@ type raft struct { c *client // Internal client for subscriptions js *jetStream // JetStream, if running, to see if we are out of resources - dflag bool // Debug flag - hasleader atomic.Bool // Is there a group leader right now? - pleader atomic.Bool // Has the group ever had a leader? - observer bool // The node is observing, i.e. not participating in voting + dflag bool // Debug flag + hasleader atomic.Bool // Is there a group leader right now? + pleader atomic.Bool // Has the group ever had a leader? + observer bool // The node is observing, i.e. not participating in voting + maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up. extSt extensionState // Extension state @@ -1538,6 +1539,7 @@ func (n *raft) Campaign() error { func (n *raft) CampaignImmediately() error { n.Lock() defer n.Unlock() + n.maybeLeader = true return n.campaign(minCampaignTimeout / 2) } @@ -3186,6 +3188,16 @@ func (n *raft) updateLeader(newLeader string) { n.hasleader.Store(newLeader != _EMPTY_) if !n.pleader.Load() && newLeader != noLeader { n.pleader.Store(true) + // If we were preferred to become the first leader, but didn't end up successful. + // Ensure to call lead change. When scaling from R1 to R3 we've optimized for a scale up + // not flipping leader/non-leader/leader status if the leader remains the same. But we need to + // correct that if the first leader turns out to be different. + if n.maybeLeader { + n.maybeLeader = false + if n.id != newLeader { + n.updateLeadChange(false) + } + } } } diff --git a/server/raft_test.go b/server/raft_test.go index d1a894965e0..fc439fc6bd6 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -2160,3 +2160,64 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { n.Applied(2) require_True(t, n.Healthy()) } + +func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { + tests := []struct { + title string + switchNode func(n *raft) + }{ + { + title: "Follower", + }, + { + title: "Candidate", + switchNode: func(n *raft) { + n.switchToCandidate() + }, + }, + { + title: "Leader", + switchNode: func(n *raft) { + n.switchToCandidate() + n.switchToLeader() + select { + case isLeader := <-n.LeadChangeC(): + require_True(t, isLeader) + default: + t.Error("Expected leadChange signal") + } + }, + }, + } + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + + // Campaigning immediately signals we're the preferred leader. + require_NoError(t, n.CampaignImmediately()) + if test.switchNode != nil { + test.switchNode(n) + } + + n.processAppendEntry(aeMsg1, n.aesub) + + select { + case isLeader := <-n.LeadChangeC(): + require_False(t, isLeader) + default: + t.Error("Expected leadChange signal") + } + require_Equal(t, n.State(), Follower) + require_Equal(t, n.leader, nats0) + require_Equal(t, n.term, 1) + }) + } +}