Skip to content

Commit aa7c048

Browse files
authored
fix(autorelay): Move relayFinder peer disconnect cleanup to separate goroutine (#3105)
1 parent 88ae979 commit aa7c048

File tree

1 file changed

+42
-32
lines changed

1 file changed

+42
-32
lines changed

p2p/host/autorelay/relay_finder.go

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,45 @@ type scheduledWorkTimes struct {
115115
nextAllowedCallToPeerSource time.Time
116116
}
117117

118+
func (rf *relayFinder) cleanupDisconnectedPeers(ctx context.Context) {
119+
subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"), eventbus.BufSize(32))
120+
if err != nil {
121+
log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
122+
return
123+
}
124+
defer subConnectedness.Close()
125+
for {
126+
select {
127+
case <-ctx.Done():
128+
return
129+
case ev, ok := <-subConnectedness.Out():
130+
if !ok {
131+
return
132+
}
133+
evt := ev.(event.EvtPeerConnectednessChanged)
134+
if evt.Connectedness != network.NotConnected {
135+
continue
136+
}
137+
push := false
138+
139+
rf.relayMx.Lock()
140+
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
141+
log.Debugw("disconnected from relay", "id", evt.Peer)
142+
delete(rf.relays, evt.Peer)
143+
rf.notifyMaybeConnectToRelay()
144+
rf.notifyMaybeNeedNewCandidates()
145+
push = true
146+
}
147+
rf.relayMx.Unlock()
148+
149+
if push {
150+
rf.clearCachedAddrsAndSignalAddressChange()
151+
rf.metricsTracer.ReservationEnded(1)
152+
}
153+
}
154+
}
155+
}
156+
118157
func (rf *relayFinder) background(ctx context.Context) {
119158
peerSourceRateLimiter := make(chan struct{}, 1)
120159
rf.refCount.Add(1)
@@ -129,13 +168,6 @@ func (rf *relayFinder) background(ctx context.Context) {
129168
rf.handleNewCandidates(ctx)
130169
}()
131170

132-
subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"))
133-
if err != nil {
134-
log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
135-
return
136-
}
137-
defer subConnectedness.Close()
138-
139171
now := rf.conf.clock.Now()
140172
bootDelayTimer := rf.conf.clock.InstantTimer(now.Add(rf.conf.bootDelay))
141173
defer bootDelayTimer.Stop()
@@ -164,32 +196,10 @@ func (rf *relayFinder) background(ctx context.Context) {
164196
workTimer := rf.conf.clock.InstantTimer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter))
165197
defer workTimer.Stop()
166198

199+
go rf.cleanupDisconnectedPeers(ctx)
200+
167201
for {
168202
select {
169-
case ev, ok := <-subConnectedness.Out():
170-
if !ok {
171-
return
172-
}
173-
evt := ev.(event.EvtPeerConnectednessChanged)
174-
if evt.Connectedness != network.NotConnected {
175-
continue
176-
}
177-
push := false
178-
179-
rf.relayMx.Lock()
180-
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
181-
log.Debugw("disconnected from relay", "id", evt.Peer)
182-
delete(rf.relays, evt.Peer)
183-
rf.notifyMaybeConnectToRelay()
184-
rf.notifyMaybeNeedNewCandidates()
185-
push = true
186-
}
187-
rf.relayMx.Unlock()
188-
189-
if push {
190-
rf.clearCachedAddrsAndSignalAddressChange()
191-
rf.metricsTracer.ReservationEnded(1)
192-
}
193203
case <-rf.candidateFound:
194204
rf.notifyMaybeConnectToRelay()
195205
case <-bootDelayTimer.Ch():
@@ -264,7 +274,7 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche
264274
if scheduledWork.nextOldCandidateCheck.Before(nextTime) {
265275
nextTime = scheduledWork.nextOldCandidateCheck
266276
}
267-
if nextTime == now {
277+
if nextTime.Equal(now) {
268278
// Only happens in CI with a mock clock
269279
nextTime = nextTime.Add(1) // avoids an infinite loop
270280
}

0 commit comments

Comments
 (0)