Skip to content

Commit 335bb1f

Browse files
committed
basic rcmgr integration tests
1 parent 78e08c9 commit 335bb1f

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

itest/rcmgr_test.go

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package itest
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
10+
11+
"github.com/libp2p/go-libp2p"
12+
"github.com/libp2p/go-libp2p-core/peer"
13+
)
14+
15+
func TestResourceManagerConnInbound(t *testing.T) {
16+
// this test checks that we can not exceed the inbound conn limit at system level
17+
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
18+
limiter := rcmgr.NewFixedLimiter(1 << 30)
19+
limiter.SystemLimits = limiter.SystemLimits.
20+
WithConnLimit(3, 1024)
21+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.
22+
WithConnLimit(1, 16)
23+
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
24+
25+
for i := 1; i < 4; i++ {
26+
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
27+
if err != nil {
28+
t.Fatal(err)
29+
}
30+
time.Sleep(10 * time.Millisecond)
31+
}
32+
33+
for i := 1; i < 4; i++ {
34+
count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID()))
35+
if count != 1 {
36+
t.Fatalf("expected %d connections to peer, got %d", 1, count)
37+
}
38+
}
39+
40+
err := echos[4].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
41+
if err == nil {
42+
t.Fatal("expected ResourceManager to block incoming connection")
43+
}
44+
}
45+
46+
func TestResourceManagerConnOutbound(t *testing.T) {
47+
// this test checks that we can not exceed the inbound conn limit at system level
48+
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
49+
limiter := rcmgr.NewFixedLimiter(1 << 30)
50+
limiter.SystemLimits = limiter.SystemLimits.
51+
WithConnLimit(1024, 3)
52+
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.
53+
WithConnLimit(16, 1)
54+
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
55+
56+
for i := 1; i < 4; i++ {
57+
err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[i].Host.ID()})
58+
if err != nil {
59+
t.Fatal(err)
60+
}
61+
time.Sleep(10 * time.Millisecond)
62+
}
63+
64+
for i := 1; i < 4; i++ {
65+
count := len(echos[i].Host.Network().ConnsToPeer(echos[0].Host.ID()))
66+
if count != 1 {
67+
t.Fatalf("expected %d connections to peer, got %d", 1, count)
68+
}
69+
}
70+
71+
err := echos[0].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[4].Host.ID()})
72+
if err == nil {
73+
t.Fatal("expected ResourceManager to block incoming connection")
74+
}
75+
}
76+
77+
func TestResourceManagerServiceInbound(t *testing.T) {
78+
// this test checks that we can not exceed the inbound stream limit at service level
79+
// we specify: 3 streams for the service, and we try to create 4 streams
80+
limiter := rcmgr.NewFixedLimiter(1 << 30)
81+
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.
82+
WithStreamLimit(3, 1024)
83+
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
84+
85+
echos[0].WaitBeforeRead = func() error {
86+
time.Sleep(100 * time.Millisecond)
87+
return nil
88+
}
89+
90+
for i := 1; i < 5; i++ {
91+
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
92+
if err != nil {
93+
t.Fatal(err)
94+
}
95+
time.Sleep(10 * time.Millisecond)
96+
}
97+
98+
var wg sync.WaitGroup
99+
for i := 1; i < 5; i++ {
100+
wg.Add(1)
101+
go func(i int) {
102+
defer wg.Done()
103+
104+
err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p")
105+
if err != nil {
106+
t.Log(err)
107+
}
108+
}(i)
109+
}
110+
wg.Wait()
111+
112+
checkEchoStatus(t, echos[0], EchoStatus{
113+
StreamsIn: 4,
114+
EchosIn: 3,
115+
EchosOut: 3,
116+
ResourceServiceErrors: 1,
117+
})
118+
}
119+
120+
func TestResourceManagerServicePeerInbound(t *testing.T) {
121+
// this test checks that we cannot exceed the per peer inbound stream limit at service level
122+
// we specify: 2 streams per peer for echo, and we try to create 3 streams
123+
limiter := rcmgr.NewFixedLimiter(1 << 30)
124+
limiter.ServicePeerLimits = map[string]rcmgr.Limit{
125+
EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024),
126+
}
127+
echos := createEchos(t, 5, libp2p.ResourceManager(rcmgr.NewResourceManager(limiter)))
128+
129+
echos[0].WaitBeforeRead = func() error {
130+
time.Sleep(100 * time.Millisecond)
131+
return nil
132+
}
133+
134+
for i := 1; i < 5; i++ {
135+
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
136+
if err != nil {
137+
t.Fatal(err)
138+
}
139+
time.Sleep(10 * time.Millisecond)
140+
}
141+
142+
var wg sync.WaitGroup
143+
for i := 1; i < 5; i++ {
144+
wg.Add(1)
145+
go func(i int) {
146+
defer wg.Done()
147+
148+
err := echos[i].Echo(echos[0].Host.ID(), "hello libp2p")
149+
if err != nil {
150+
t.Log(err)
151+
}
152+
}(i)
153+
}
154+
wg.Wait()
155+
156+
checkEchoStatus(t, echos[0], EchoStatus{
157+
StreamsIn: 4,
158+
EchosIn: 4,
159+
EchosOut: 4,
160+
ResourceServiceErrors: 0,
161+
})
162+
163+
for i := 0; i < 3; i++ {
164+
wg.Add(1)
165+
go func() {
166+
defer wg.Done()
167+
168+
err := echos[2].Echo(echos[0].Host.ID(), "hello libp2p")
169+
if err != nil {
170+
t.Log(err)
171+
}
172+
}()
173+
}
174+
wg.Wait()
175+
176+
checkEchoStatus(t, echos[0], EchoStatus{
177+
StreamsIn: 7,
178+
EchosIn: 6,
179+
EchosOut: 6,
180+
ResourceServiceErrors: 1,
181+
})
182+
}

0 commit comments

Comments
 (0)