Skip to content

Commit ba33799

Browse files
committed
add semaphore to control push/delta concurrency
This avoids stream storms that can clog the transient scope when we have a largish number of peers.
1 parent da67311 commit ba33799

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

p2p/protocol/identify/id.go

+8
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ const LibP2PVersion = "ipfs/0.1.0"
4343

4444
const ServiceName = "libp2p.identify"
4545

46+
const maxPushConcurrency = 32
47+
4648
// StreamReadTimeout is the read timeout on all incoming Identify family streams.
4749
var StreamReadTimeout = 60 * time.Second
4850

@@ -129,6 +131,10 @@ type idService struct {
129131

130132
addPeerHandlerCh chan addPeerHandlerReq
131133
rmPeerHandlerCh chan rmPeerHandlerReq
134+
135+
// pushSemaphore limits the push/delta concurrency to avoid storms
136+
// that clog the transient scope.
137+
pushSemaphore chan struct{}
132138
}
133139

134140
// NewIDService constructs a new *idService and activates it by
@@ -154,6 +160,8 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
154160

155161
addPeerHandlerCh: make(chan addPeerHandlerReq),
156162
rmPeerHandlerCh: make(chan rmPeerHandlerReq),
163+
164+
pushSemaphore: make(chan struct{}, maxPushConcurrency),
157165
}
158166
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
159167

p2p/protocol/identify/peer_loop.go

+5
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network
179179
return nil, errProtocolNotSupported
180180
}
181181

182+
ph.ids.pushSemaphore <- struct{}{}
183+
defer func() {
184+
<-ph.ids.pushSemaphore
185+
}()
186+
182187
// negotiate a stream without opening a new connection as we "should" already have a connection.
183188
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
184189
defer cancel()

0 commit comments

Comments
 (0)