Skip to content

Commit 426203b

Browse files
committed
basic rcmgr integration tests
1 parent 15ee0d6 commit 426203b

File tree

3 files changed

+275
-38
lines changed

3 files changed

+275
-38
lines changed

itest/echo.go

+24-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
type Echo struct {
2727
Host host.Host
2828

29-
WaitBeforeRead, WaitBeforeWrite func() error
29+
BeforeReserve, BeforeRead, BeforeWrite, BeforeDone func() error
3030

3131
mx sync.Mutex
3232
status EchoStatus
@@ -60,6 +60,15 @@ func (e *Echo) handleStream(s network.Stream) {
6060
e.status.StreamsIn++
6161
e.mx.Unlock()
6262

63+
if e.BeforeReserve != nil {
64+
if err := e.BeforeReserve(); err != nil {
65+
echoLog.Debugf("error syncing before reserve: %s", err)
66+
67+
s.Reset()
68+
return
69+
}
70+
}
71+
6372
if err := s.Scope().SetService(EchoService); err != nil {
6473
echoLog.Debugf("error attaching stream to echo service: %s", err)
6574

@@ -82,9 +91,9 @@ func (e *Echo) handleStream(s network.Stream) {
8291
return
8392
}
8493

85-
if e.WaitBeforeRead != nil {
86-
if err := e.WaitBeforeRead(); err != nil {
87-
echoLog.Debugf("error waiting before read: %s", err)
94+
if e.BeforeRead != nil {
95+
if err := e.BeforeRead(); err != nil {
96+
echoLog.Debugf("error syncing before read: %s", err)
8897

8998
s.Reset()
9099
return
@@ -116,9 +125,9 @@ func (e *Echo) handleStream(s network.Stream) {
116125
e.status.EchosIn++
117126
e.mx.Unlock()
118127

119-
if e.WaitBeforeWrite != nil {
120-
if err := e.WaitBeforeWrite(); err != nil {
121-
echoLog.Debugf("error waiting before write: %s", err)
128+
if e.BeforeWrite != nil {
129+
if err := e.BeforeWrite(); err != nil {
130+
echoLog.Debugf("error syncing before write: %s", err)
122131

123132
s.Reset()
124133
return
@@ -143,6 +152,14 @@ func (e *Echo) handleStream(s network.Stream) {
143152
e.mx.Unlock()
144153

145154
s.CloseWrite()
155+
156+
if e.BeforeDone != nil {
157+
if err := e.BeforeDone(); err != nil {
158+
echoLog.Debugf("error syncing before done: %s", err)
159+
160+
s.Reset()
161+
}
162+
}
146163
}
147164

148165
func (e *Echo) Echo(p peer.ID, what string) error {

itest/echo_test.go

+18-31
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,19 @@ import (
77
"github.com/libp2p/go-libp2p"
88
"github.com/libp2p/go-libp2p-core/peer"
99
"github.com/libp2p/go-libp2p-core/peerstore"
10+
11+
"github.com/stretchr/testify/require"
1012
)
1113

12-
func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
14+
func createEchos(t *testing.T, count int, makeOpts ...func() libp2p.Option) []*Echo {
1315
result := make([]*Echo, 0, count)
1416

1517
for i := 0; i < count; i++ {
18+
opts := make([]libp2p.Option, 0, len(makeOpts))
19+
for _, makeOpt := range makeOpts {
20+
opts = append(opts, makeOpt())
21+
}
22+
1623
h, err := libp2p.New(opts...)
1724
if err != nil {
1825
t.Fatal(err)
@@ -35,46 +42,26 @@ func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
3542
return result
3643
}
3744

45+
func closeEchos(echos []*Echo) {
46+
for _, e := range echos {
47+
e.Host.Close()
48+
}
49+
}
50+
3851
func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) {
3952
t.Helper()
40-
41-
status := e.Status()
42-
43-
if status.StreamsIn != expected.StreamsIn {
44-
t.Fatalf("expected %d streams in, got %d", expected.StreamsIn, status.StreamsIn)
45-
}
46-
if status.EchosIn != expected.EchosIn {
47-
t.Fatalf("expected %d echos in, got %d", expected.EchosIn, status.EchosIn)
48-
}
49-
if status.EchosOut != expected.EchosOut {
50-
t.Fatalf("expected %d echos out, got %d", expected.EchosOut, status.EchosOut)
51-
}
52-
if status.IOErrors != expected.IOErrors {
53-
t.Fatalf("expected %d I/O errors, got %d", expected.IOErrors, status.IOErrors)
54-
}
55-
if status.ResourceServiceErrors != expected.ResourceServiceErrors {
56-
t.Fatalf("expected %d service resource errors, got %d", expected.ResourceServiceErrors, status.ResourceServiceErrors)
57-
}
58-
if status.ResourceReservationErrors != expected.ResourceReservationErrors {
59-
t.Fatalf("expected %d reservation resource errors, got %d", expected.ResourceReservationErrors, status.ResourceReservationErrors)
60-
}
53+
require.Equal(t, expected, e.Status())
6154
}
6255

6356
func TestEcho(t *testing.T) {
6457
echos := createEchos(t, 2)
58+
defer closeEchos(echos)
6559

66-
err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()})
67-
if err != nil {
60+
if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil {
6861
t.Fatal(err)
6962
}
7063

71-
defer func() {
72-
for _, e := range echos {
73-
e.Host.Close()
74-
}
75-
}()
76-
77-
if err = echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
64+
if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
7865
t.Fatal(err)
7966
}
8067

itest/rcmgr_test.go

+233
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package itest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
12+
13+
"github.com/libp2p/go-libp2p"
14+
"github.com/libp2p/go-libp2p-core/peer"
15+
)
16+
17+
func makeRcmgrOption(t *testing.T, limiter *rcmgr.BasicLimiter) func() libp2p.Option {
18+
return func() libp2p.Option {
19+
mgr, err := rcmgr.NewResourceManager(limiter)
20+
if err != nil {
21+
t.Fatal(err)
22+
}
23+
return libp2p.ResourceManager(mgr)
24+
}
25+
}
26+
27+
func TestResourceManagerConnInbound(t *testing.T) {
28+
// this test checks that we can not exceed the inbound conn limit at system level
29+
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
30+
limiter := rcmgr.NewFixedLimiter(1 << 30)
31+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024, 1024)
32+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 16, 16)
33+
34+
echos := createEchos(t, 5, makeRcmgrOption(t, limiter))
35+
defer closeEchos(echos)
36+
37+
for i := 1; i < 4; i++ {
38+
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
time.Sleep(10 * time.Millisecond)
43+
}
44+
45+
for i := 1; i < 4; i++ {
46+
count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID()))
47+
if count != 1 {
48+
t.Fatalf("expected %d connections to peer, got %d", 1, count)
49+
}
50+
}
51+
52+
err := echos[4].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
53+
if err == nil {
54+
t.Fatal("expected ResourceManager to block incoming connection")
55+
}
56+
}
57+
58+
func TestResourceManagerConnOutbound(t *testing.T) {
59+
// this test checks that we can not exceed the inbound conn limit at system level
60+
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
61+
limiter := rcmgr.NewFixedLimiter(1 << 30)
62+
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3, 1024)
63+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(16, 1, 16)
64+
echos := createEchos(t, 5, makeRcmgrOption(t, limiter))
65+
defer closeEchos(echos)
66+
67+
for i := 1; i < 4; i++ {
68+
err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[i].Host.ID()})
69+
if err != nil {
70+
t.Fatal(err)
71+
}
72+
time.Sleep(10 * time.Millisecond)
73+
}
74+
75+
for i := 1; i < 4; i++ {
76+
count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID()))
77+
if count != 1 {
78+
t.Fatalf("expected %d connections to peer, got %d", 1, count)
79+
}
80+
}
81+
82+
err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[4].Host.ID()})
83+
if err == nil {
84+
t.Fatal("expected ResourceManager to block incoming connection")
85+
}
86+
}
87+
88+
func TestResourceManagerServiceInbound(t *testing.T) {
89+
// this test checks that we can not exceed the inbound stream limit at service level
90+
// we specify: 3 streams for the service, and we try to create 4 streams
91+
limiter := rcmgr.NewFixedLimiter(1 << 30)
92+
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024, 1024)
93+
echos := createEchos(t, 5, makeRcmgrOption(t, limiter))
94+
defer closeEchos(echos)
95+
96+
ready := new(chan struct{})
97+
echos[0].BeforeDone = waitForChannel(ready, time.Minute)
98+
99+
for i := 1; i < 5; i++ {
100+
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
time.Sleep(10 * time.Millisecond)
105+
}
106+
107+
*ready = make(chan struct{})
108+
109+
var once sync.Once
110+
var wg sync.WaitGroup
111+
for i := 1; i < 5; i++ {
112+
wg.Add(1)
113+
go func(i int) {
114+
defer wg.Done()
115+
116+
err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p")
117+
if err != nil {
118+
t.Log(err)
119+
once.Do(func() {
120+
close(*ready)
121+
})
122+
}
123+
}(i)
124+
}
125+
wg.Wait()
126+
127+
checkEchoStatus(t, echos[0], EchoStatus{
128+
StreamsIn: 4,
129+
EchosIn: 3,
130+
EchosOut: 3,
131+
ResourceServiceErrors: 1,
132+
})
133+
}
134+
135+
func TestResourceManagerServicePeerInbound(t *testing.T) {
136+
// this test checks that we cannot exceed the per peer inbound stream limit at service level
137+
// we specify: 2 streams per peer for echo, and we try to create 3 streams
138+
limiter := rcmgr.NewFixedLimiter(1 << 30)
139+
limiter.ServicePeerLimits = map[string]rcmgr.Limit{
140+
EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024, 1024),
141+
}
142+
echos := createEchos(t, 5, makeRcmgrOption(t, limiter))
143+
defer closeEchos(echos)
144+
145+
count := new(int32)
146+
ready := new(chan struct{})
147+
echos[0].BeforeDone = waitForBarrier(count, ready, time.Minute)
148+
149+
for i := 1; i < 5; i++ {
150+
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
151+
if err != nil {
152+
t.Fatal(err)
153+
}
154+
time.Sleep(10 * time.Millisecond)
155+
}
156+
157+
*count = 4
158+
*ready = make(chan struct{})
159+
160+
var wg sync.WaitGroup
161+
for i := 1; i < 5; i++ {
162+
wg.Add(1)
163+
go func(i int) {
164+
defer wg.Done()
165+
166+
err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p")
167+
if err != nil {
168+
t.Log(err)
169+
}
170+
}(i)
171+
}
172+
wg.Wait()
173+
174+
checkEchoStatus(t, echos[0], EchoStatus{
175+
StreamsIn: 4,
176+
EchosIn: 4,
177+
EchosOut: 4,
178+
ResourceServiceErrors: 0,
179+
})
180+
181+
*ready = make(chan struct{})
182+
echos[0].BeforeDone = waitForChannel(ready, time.Minute)
183+
184+
var once sync.Once
185+
for i := 0; i < 3; i++ {
186+
wg.Add(1)
187+
go func() {
188+
defer wg.Done()
189+
190+
err := echos[2].Echo(echos[0].Host.ID(), "hello libp2p")
191+
if err != nil {
192+
t.Log(err)
193+
once.Do(func() {
194+
close(*ready)
195+
})
196+
}
197+
}()
198+
}
199+
wg.Wait()
200+
201+
checkEchoStatus(t, echos[0], EchoStatus{
202+
StreamsIn: 7,
203+
EchosIn: 6,
204+
EchosOut: 6,
205+
ResourceServiceErrors: 1,
206+
})
207+
}
208+
209+
func waitForBarrier(count *int32, ready *chan struct{}, timeout time.Duration) func() error {
210+
return func() error {
211+
if atomic.AddInt32(count, -1) == 0 {
212+
close(*ready)
213+
}
214+
215+
select {
216+
case <-*ready:
217+
return nil
218+
case <-time.After(timeout):
219+
return fmt.Errorf("timeout")
220+
}
221+
}
222+
}
223+
224+
func waitForChannel(ready *chan struct{}, timeout time.Duration) func() error {
225+
return func() error {
226+
select {
227+
case <-*ready:
228+
return nil
229+
case <-time.After(timeout):
230+
return fmt.Errorf("timeout")
231+
}
232+
}
233+
}

0 commit comments

Comments
 (0)