Skip to content

Commit a53c6ad

Browse files
Cherry-picks for 2.10.29-RC.2 (#6853)
Includes the following: - #6851 - #6849 [skip ci] Signed-off-by: Neil Twigg <[email protected]>
2 parents 1b51c8f + c29db59 commit a53c6ad

File tree

4 files changed

+94
-10
lines changed

4 files changed

+94
-10
lines changed

server/monitor.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,17 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
340340

341341
// Search by individual CID.
342342
if cid > 0 {
343-
if state == ConnClosed || state == ConnAll {
343+
// Let's first check if user also selects on ConnOpen or ConnAll
344+
// and look for opened connections.
345+
if state == ConnOpen || state == ConnAll {
346+
if client := s.clients[cid]; client != nil {
347+
openClients = append(openClients, client)
348+
closedClients = nil
349+
}
350+
}
351+
// If we did not find, and the user selected for ConnClosed or ConnAll,
352+
// look for closed connections.
353+
if len(openClients) == 0 && (state == ConnClosed || state == ConnAll) {
344354
copyClosed := closedClients
345355
closedClients = nil
346356
for _, cc := range copyClosed {
@@ -349,11 +359,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
349359
break
350360
}
351361
}
352-
} else if state == ConnOpen || state == ConnAll {
353-
client := s.clients[cid]
354-
if client != nil {
355-
openClients = append(openClients, client)
356-
}
357362
}
358363
} else {
359364
// Gather all open clients.

server/monitor_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,6 +1890,12 @@ func TestConnzWithStateForClosedConns(t *testing.T) {
18901890
if lc := len(c.Conns); lc != 1 {
18911891
return fmt.Errorf("Expected a connection in open array, got %d", lc)
18921892
}
1893+
// It should also work if we ask for "state=all"
1894+
c = pollConz(t, s, mode, url+"connz?cid=2&state=all", &ConnzOptions{CID: 2, State: ConnAll})
1895+
if lc := len(c.Conns); lc != 1 {
1896+
return fmt.Errorf("Expected a connection in open array, got %d", lc)
1897+
}
1898+
// But not for "state=closed"
18931899
c = pollConz(t, s, mode, url+"connz?cid=2&state=closed", &ConnzOptions{CID: 2, State: ConnClosed})
18941900
if lc := len(c.Conns); lc != 0 {
18951901
return fmt.Errorf("Expected no connections in closed array, got %d", lc)

server/raft.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,11 @@ type raft struct {
177177
c *client // Internal client for subscriptions
178178
js *jetStream // JetStream, if running, to see if we are out of resources
179179

180-
dflag bool // Debug flag
181-
hasleader atomic.Bool // Is there a group leader right now?
182-
pleader atomic.Bool // Has the group ever had a leader?
183-
observer bool // The node is observing, i.e. not participating in voting
180+
dflag bool // Debug flag
181+
hasleader atomic.Bool // Is there a group leader right now?
182+
pleader atomic.Bool // Has the group ever had a leader?
183+
observer bool // The node is observing, i.e. not participating in voting
184+
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.
184185

185186
extSt extensionState // Extension state
186187

@@ -1538,6 +1539,7 @@ func (n *raft) Campaign() error {
15381539
func (n *raft) CampaignImmediately() error {
15391540
n.Lock()
15401541
defer n.Unlock()
1542+
n.maybeLeader = true
15411543
return n.campaign(minCampaignTimeout / 2)
15421544
}
15431545

@@ -3186,6 +3188,16 @@ func (n *raft) updateLeader(newLeader string) {
31863188
n.hasleader.Store(newLeader != _EMPTY_)
31873189
if !n.pleader.Load() && newLeader != noLeader {
31883190
n.pleader.Store(true)
3191+
// If we were preferred to become the first leader, but didn't end up successful.
3192+
// Ensure to call lead change. When scaling from R1 to R3 we've optimized for a scale up
3193+
// not flipping leader/non-leader/leader status if the leader remains the same. But we need to
3194+
// correct that if the first leader turns out to be different.
3195+
if n.maybeLeader {
3196+
n.maybeLeader = false
3197+
if n.id != newLeader {
3198+
n.updateLeadChange(false)
3199+
}
3200+
}
31893201
}
31903202
}
31913203

server/raft_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2160,3 +2160,64 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) {
21602160
n.Applied(2)
21612161
require_True(t, n.Healthy())
21622162
}
2163+
2164+
func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) {
2165+
tests := []struct {
2166+
title string
2167+
switchNode func(n *raft)
2168+
}{
2169+
{
2170+
title: "Follower",
2171+
},
2172+
{
2173+
title: "Candidate",
2174+
switchNode: func(n *raft) {
2175+
n.switchToCandidate()
2176+
},
2177+
},
2178+
{
2179+
title: "Leader",
2180+
switchNode: func(n *raft) {
2181+
n.switchToCandidate()
2182+
n.switchToLeader()
2183+
select {
2184+
case isLeader := <-n.LeadChangeC():
2185+
require_True(t, isLeader)
2186+
default:
2187+
t.Error("Expected leadChange signal")
2188+
}
2189+
},
2190+
},
2191+
}
2192+
for _, test := range tests {
2193+
t.Run(test.title, func(t *testing.T) {
2194+
n, cleanup := initSingleMemRaftNode(t)
2195+
defer cleanup()
2196+
2197+
// Create a sample entry, the content doesn't matter, just that it's stored.
2198+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2199+
entries := []*Entry{newEntry(EntryNormal, esm)}
2200+
2201+
nats0 := "S1Nunr6R" // "nats-0"
2202+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
2203+
2204+
// Campaigning immediately signals we're the preferred leader.
2205+
require_NoError(t, n.CampaignImmediately())
2206+
if test.switchNode != nil {
2207+
test.switchNode(n)
2208+
}
2209+
2210+
n.processAppendEntry(aeMsg1, n.aesub)
2211+
2212+
select {
2213+
case isLeader := <-n.LeadChangeC():
2214+
require_False(t, isLeader)
2215+
default:
2216+
t.Error("Expected leadChange signal")
2217+
}
2218+
require_Equal(t, n.State(), Follower)
2219+
require_Equal(t, n.leader, nats0)
2220+
require_Equal(t, n.term, 1)
2221+
})
2222+
}
2223+
}

0 commit comments

Comments
 (0)