Skip to content

Cherry-picks for 2.10.29-RC.2 #6853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,17 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {

// Search by individual CID.
if cid > 0 {
if state == ConnClosed || state == ConnAll {
// Let's first check if user also selects on ConnOpen or ConnAll
// and look for opened connections.
if state == ConnOpen || state == ConnAll {
if client := s.clients[cid]; client != nil {
openClients = append(openClients, client)
closedClients = nil
}
}
// If we did not find, and the user selected for ConnClosed or ConnAll,
// look for closed connections.
if len(openClients) == 0 && (state == ConnClosed || state == ConnAll) {
copyClosed := closedClients
closedClients = nil
for _, cc := range copyClosed {
Expand All @@ -349,11 +359,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
break
}
}
} else if state == ConnOpen || state == ConnAll {
client := s.clients[cid]
if client != nil {
openClients = append(openClients, client)
}
}
} else {
// Gather all open clients.
Expand Down
6 changes: 6 additions & 0 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,12 @@ func TestConnzWithStateForClosedConns(t *testing.T) {
if lc := len(c.Conns); lc != 1 {
return fmt.Errorf("Expected a connection in open array, got %d", lc)
}
// It should also work if we ask for "state=all"
c = pollConz(t, s, mode, url+"connz?cid=2&state=all", &ConnzOptions{CID: 2, State: ConnAll})
if lc := len(c.Conns); lc != 1 {
return fmt.Errorf("Expected a connection in open array, got %d", lc)
}
// But not for "state=closed"
c = pollConz(t, s, mode, url+"connz?cid=2&state=closed", &ConnzOptions{CID: 2, State: ConnClosed})
if lc := len(c.Conns); lc != 0 {
return fmt.Errorf("Expected no connections in closed array, got %d", lc)
Expand Down
20 changes: 16 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ type raft struct {
c *client // Internal client for subscriptions
js *jetStream // JetStream, if running, to see if we are out of resources

dflag bool // Debug flag
hasleader atomic.Bool // Is there a group leader right now?
pleader atomic.Bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting
dflag bool // Debug flag
hasleader atomic.Bool // Is there a group leader right now?
pleader atomic.Bool // Has the group ever had a leader?
observer bool // The node is observing, i.e. not participating in voting
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.

extSt extensionState // Extension state

Expand Down Expand Up @@ -1538,6 +1539,7 @@ func (n *raft) Campaign() error {
func (n *raft) CampaignImmediately() error {
n.Lock()
defer n.Unlock()
n.maybeLeader = true
return n.campaign(minCampaignTimeout / 2)
}

Expand Down Expand Up @@ -3186,6 +3188,16 @@ func (n *raft) updateLeader(newLeader string) {
n.hasleader.Store(newLeader != _EMPTY_)
if !n.pleader.Load() && newLeader != noLeader {
n.pleader.Store(true)
// If we were preferred to become the first leader, but didn't end up successful.
// Ensure to call lead change. When scaling from R1 to R3 we've optimized for a scale up
// not flipping leader/non-leader/leader status if the leader remains the same. But we need to
// correct that if the first leader turns out to be different.
if n.maybeLeader {
n.maybeLeader = false
if n.id != newLeader {
n.updateLeadChange(false)
}
}
}
}

Expand Down
61 changes: 61 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2160,3 +2160,64 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) {
n.Applied(2)
require_True(t, n.Healthy())
}

func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) {
tests := []struct {
title string
switchNode func(n *raft)
}{
{
title: "Follower",
},
{
title: "Candidate",
switchNode: func(n *raft) {
n.switchToCandidate()
},
},
{
title: "Leader",
switchNode: func(n *raft) {
n.switchToCandidate()
n.switchToLeader()
select {
case isLeader := <-n.LeadChangeC():
require_True(t, isLeader)
default:
t.Error("Expected leadChange signal")
}
},
},
}
for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})

// Campaigning immediately signals we're the preferred leader.
require_NoError(t, n.CampaignImmediately())
if test.switchNode != nil {
test.switchNode(n)
}

n.processAppendEntry(aeMsg1, n.aesub)

select {
case isLeader := <-n.LeadChangeC():
require_False(t, isLeader)
default:
t.Error("Expected leadChange signal")
}
require_Equal(t, n.State(), Follower)
require_Equal(t, n.leader, nats0)
require_Equal(t, n.term, 1)
})
}
}