Skip to content

Commit 5615715

Browse files
authored
feat: log when resource manager limits are exceeded (#8980)
This periodically logs how many times Resource Manager limits were exceeded. If they aren't exceeded, then nothing is logged. The log levels are at ERROR log level so that they are shown by default. The motivation is so that users know when they have exceeded resource manager limits. To find what is exceeding the limits, they'll need to turn on debug logging and inspect the errors being logged. This could collect the specific limits being reached, but that's more complicated to implement and could result in much longer log messages.
1 parent 650bc24 commit 5615715

File tree

5 files changed

+236
-6
lines changed

5 files changed

+236
-6
lines changed

core/commands/swarm.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel
409409
return errors.New("expected a JSON file")
410410
}
411411
if err := json.NewDecoder(file).Decode(&newLimit); err != nil {
412-
return errors.New("failed to decode JSON as ResourceMgrScopeConfig")
412+
return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err)
413413
}
414414
return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit)
415415
}

core/node/libp2p/rcmgr.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"path/filepath"
88
"strings"
99

10+
"github.com/benbjohnson/clock"
1011
config "github.com/ipfs/go-ipfs/config"
12+
"github.com/ipfs/go-ipfs/core/node/helpers"
1113
"github.com/ipfs/go-ipfs/repo"
12-
14+
logging "github.com/ipfs/go-log/v2"
1315
"github.com/libp2p/go-libp2p"
1416
"github.com/libp2p/go-libp2p-core/network"
1517
"github.com/libp2p/go-libp2p-core/peer"
@@ -24,8 +26,8 @@ const NetLimitTraceFilename = "rcmgr.json.gz"
2426

2527
var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled")
2628

27-
func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
28-
return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
29+
func ResourceManager(cfg config.SwarmConfig) interface{} {
30+
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
2931
var manager network.ResourceManager
3032
var opts Libp2pOpts
3133

@@ -72,6 +74,13 @@ func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (netw
7274
if err != nil {
7375
return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err)
7476
}
77+
lrm := &loggingResourceManager{
78+
clock: clock.New(),
79+
logger: &logging.Logger("resourcemanager").SugaredLogger,
80+
delegate: manager,
81+
}
82+
lrm.start(helpers.LifecycleCtx(mctx, lc))
83+
manager = lrm
7584
} else {
7685
log.Debug("libp2p resource manager is disabled")
7786
manager = network.NullResourceManager

core/node/libp2p/rcmgr_logging.go

+160
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package libp2p
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
9+
"github.com/benbjohnson/clock"
10+
"github.com/libp2p/go-libp2p-core/network"
11+
"github.com/libp2p/go-libp2p-core/peer"
12+
"github.com/libp2p/go-libp2p-core/protocol"
13+
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
14+
"go.uber.org/zap"
15+
)
16+
17+
type loggingResourceManager struct {
18+
clock clock.Clock
19+
logger *zap.SugaredLogger
20+
delegate network.ResourceManager
21+
logInterval time.Duration
22+
23+
mut sync.Mutex
24+
limitExceededErrs uint64
25+
}
26+
27+
type loggingScope struct {
28+
logger *zap.SugaredLogger
29+
delegate network.ResourceScope
30+
countErrs func(error)
31+
}
32+
33+
var _ network.ResourceManager = (*loggingResourceManager)(nil)
34+
35+
func (n *loggingResourceManager) start(ctx context.Context) {
36+
logInterval := n.logInterval
37+
if logInterval == 0 {
38+
logInterval = 10 * time.Second
39+
}
40+
ticker := n.clock.Ticker(logInterval)
41+
go func() {
42+
defer ticker.Stop()
43+
for {
44+
select {
45+
case <-ticker.C:
46+
n.mut.Lock()
47+
errs := n.limitExceededErrs
48+
n.limitExceededErrs = 0
49+
n.mut.Unlock()
50+
if errs != 0 {
51+
n.logger.Warnf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs)
52+
}
53+
case <-ctx.Done():
54+
return
55+
}
56+
}
57+
}()
58+
}
59+
60+
func (n *loggingResourceManager) countErrs(err error) {
61+
if errors.Is(err, network.ErrResourceLimitExceeded) {
62+
n.mut.Lock()
63+
n.limitExceededErrs++
64+
n.mut.Unlock()
65+
}
66+
}
67+
68+
func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error {
69+
return n.delegate.ViewSystem(f)
70+
}
71+
func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error {
72+
return n.delegate.ViewTransient(func(s network.ResourceScope) error {
73+
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
74+
})
75+
}
76+
func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error {
77+
return n.delegate.ViewService(svc, func(s network.ServiceScope) error {
78+
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
79+
})
80+
}
81+
func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error {
82+
return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error {
83+
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
84+
})
85+
}
86+
func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error {
87+
return n.delegate.ViewPeer(p, func(s network.PeerScope) error {
88+
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
89+
})
90+
}
91+
func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) {
92+
connMgmtScope, err := n.delegate.OpenConnection(dir, usefd)
93+
n.countErrs(err)
94+
return connMgmtScope, err
95+
}
96+
func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
97+
connMgmtScope, err := n.delegate.OpenStream(p, dir)
98+
n.countErrs(err)
99+
return connMgmtScope, err
100+
}
101+
func (n *loggingResourceManager) Close() error {
102+
return n.delegate.Close()
103+
}
104+
105+
func (s *loggingScope) ReserveMemory(size int, prio uint8) error {
106+
err := s.delegate.ReserveMemory(size, prio)
107+
s.countErrs(err)
108+
return err
109+
}
110+
func (s *loggingScope) ReleaseMemory(size int) {
111+
s.delegate.ReleaseMemory(size)
112+
}
113+
func (s *loggingScope) Stat() network.ScopeStat {
114+
return s.delegate.Stat()
115+
}
116+
func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) {
117+
return s.delegate.BeginSpan()
118+
}
119+
func (s *loggingScope) Done() {
120+
s.delegate.(network.ResourceScopeSpan).Done()
121+
}
122+
func (s *loggingScope) Name() string {
123+
return s.delegate.(network.ServiceScope).Name()
124+
}
125+
func (s *loggingScope) Protocol() protocol.ID {
126+
return s.delegate.(network.ProtocolScope).Protocol()
127+
}
128+
func (s *loggingScope) Peer() peer.ID {
129+
return s.delegate.(network.PeerScope).Peer()
130+
}
131+
func (s *loggingScope) PeerScope() network.PeerScope {
132+
return s.delegate.(network.PeerScope)
133+
}
134+
func (s *loggingScope) SetPeer(p peer.ID) error {
135+
err := s.delegate.(network.ConnManagementScope).SetPeer(p)
136+
s.countErrs(err)
137+
return err
138+
}
139+
func (s *loggingScope) ProtocolScope() network.ProtocolScope {
140+
return s.delegate.(network.ProtocolScope)
141+
}
142+
func (s *loggingScope) SetProtocol(proto protocol.ID) error {
143+
err := s.delegate.(network.StreamManagementScope).SetProtocol(proto)
144+
s.countErrs(err)
145+
return err
146+
}
147+
func (s *loggingScope) ServiceScope() network.ServiceScope {
148+
return s.delegate.(network.ServiceScope)
149+
}
150+
func (s *loggingScope) SetService(srv string) error {
151+
err := s.delegate.(network.StreamManagementScope).SetService(srv)
152+
s.countErrs(err)
153+
return err
154+
}
155+
func (s *loggingScope) Limit() rcmgr.Limit {
156+
return s.delegate.(rcmgr.ResourceScopeLimiter).Limit()
157+
}
158+
func (s *loggingScope) SetLimit(limit rcmgr.Limit) {
159+
s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit)
160+
}
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package libp2p
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/benbjohnson/clock"
9+
"github.com/libp2p/go-libp2p-core/network"
10+
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
11+
"github.com/stretchr/testify/require"
12+
"go.uber.org/zap"
13+
"go.uber.org/zap/zaptest/observer"
14+
)
15+
16+
func TestLoggingResourceManager(t *testing.T) {
17+
clock := clock.NewMock()
18+
limiter := rcmgr.NewDefaultLimiter()
19+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1)
20+
rm, err := rcmgr.NewResourceManager(limiter)
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
25+
oCore, oLogs := observer.New(zap.WarnLevel)
26+
oLogger := zap.New(oCore)
27+
lrm := &loggingResourceManager{
28+
clock: clock,
29+
logger: oLogger.Sugar(),
30+
delegate: rm,
31+
logInterval: 1 * time.Second,
32+
}
33+
34+
// 2 of these should result in resource limit exceeded errors and subsequent log messages
35+
for i := 0; i < 3; i++ {
36+
_, _ = lrm.OpenConnection(network.DirInbound, false)
37+
}
38+
39+
// run the logger which will write an entry for those errors
40+
ctx, cancel := context.WithCancel(context.Background())
41+
defer cancel()
42+
lrm.start(ctx)
43+
clock.Add(3 * time.Second)
44+
45+
timer := time.NewTimer(1 * time.Second)
46+
for {
47+
select {
48+
case <-timer.C:
49+
t.Fatalf("expected logs never arrived")
50+
default:
51+
if oLogs.Len() == 0 {
52+
continue
53+
}
54+
require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message)
55+
return
56+
}
57+
}
58+
}

go.mod

+5-2
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,17 @@ require (
125125
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
126126
)
127127

128+
require (
129+
github.com/benbjohnson/clock v1.3.0
130+
github.com/ipfs/go-log/v2 v2.5.1
131+
)
132+
128133
require (
129134
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
130135
github.com/Kubuxu/go-os-helper v0.0.1 // indirect
131136
github.com/Stebalien/go-bitfield v0.0.1 // indirect
132137
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
133138
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
134-
github.com/benbjohnson/clock v1.3.0 // indirect
135139
github.com/beorn7/perks v1.0.1 // indirect
136140
github.com/btcsuite/btcd v0.22.0-beta // indirect
137141
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
@@ -172,7 +176,6 @@ require (
172176
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
173177
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
174178
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
175-
github.com/ipfs/go-log/v2 v2.5.1 // indirect
176179
github.com/ipfs/go-peertaskqueue v0.7.1 // indirect
177180
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
178181
github.com/klauspost/compress v1.15.1 // indirect

0 commit comments

Comments
 (0)