Skip to content

Commit f2fa0b6

Browse files
authored
Merge pull request #1315 from libp2p/fix/flaky-rcmgr-test
fix flaky resource manager tests
2 parents c681541 + edc86d9 commit f2fa0b6

File tree

2 files changed

+42
-11
lines changed

2 files changed

+42
-11
lines changed

itest/echo.go

+19
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Echo struct {
3030
status EchoStatus
3131

3232
beforeReserve, beforeRead, beforeWrite, beforeDone func() error
33+
done func()
3334
}
3435

3536
type EchoStatus struct {
@@ -81,6 +82,13 @@ func (e *Echo) BeforeDone(f func() error) {
8182
e.beforeDone = f
8283
}
8384

85+
func (e *Echo) Done(f func()) {
86+
e.mx.Lock()
87+
defer e.mx.Unlock()
88+
89+
e.done = f
90+
}
91+
8492
func (e *Echo) getBeforeReserve() func() error {
8593
e.mx.Lock()
8694
defer e.mx.Unlock()
@@ -109,9 +117,20 @@ func (e *Echo) getBeforeDone() func() error {
109117
return e.beforeDone
110118
}
111119

120+
func (e *Echo) getDone() func() {
121+
e.mx.Lock()
122+
defer e.mx.Unlock()
123+
124+
return e.done
125+
}
126+
112127
func (e *Echo) handleStream(s network.Stream) {
113128
defer s.Close()
114129

130+
if done := e.getDone(); done != nil {
131+
defer done()
132+
}
133+
115134
e.mx.Lock()
116135
e.status.StreamsIn++
117136
e.mx.Unlock()

itest/rcmgr_test.go

+23-11
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ func TestResourceManagerServiceInbound(t *testing.T) {
109109
defer closeEchos(echos)
110110
defer closeRcmgrs(echos)
111111

112-
ready := make(chan struct{})
113-
echos[0].BeforeDone(waitForChannel(ready, time.Minute))
114-
115112
for i := 1; i < 5; i++ {
116113
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
117114
if err != nil {
@@ -120,9 +117,16 @@ func TestResourceManagerServiceInbound(t *testing.T) {
120117
time.Sleep(10 * time.Millisecond)
121118
}
122119

120+
ready := make(chan struct{})
121+
echos[0].BeforeDone(waitForChannel(ready, time.Minute))
122+
123+
var eg sync.WaitGroup
124+
echos[0].Done(eg.Done)
125+
123126
var once sync.Once
124127
var wg sync.WaitGroup
125128
for i := 1; i < 5; i++ {
129+
eg.Add(1)
126130
wg.Add(1)
127131
go func(i int) {
128132
defer wg.Done()
@@ -137,6 +141,7 @@ func TestResourceManagerServiceInbound(t *testing.T) {
137141
}(i)
138142
}
139143
wg.Wait()
144+
eg.Wait()
140145

141146
checkEchoStatus(t, echos[0], EchoStatus{
142147
StreamsIn: 4,
@@ -157,11 +162,6 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
157162
defer closeEchos(echos)
158163
defer closeRcmgrs(echos)
159164

160-
count := new(int32)
161-
ready := make(chan struct{})
162-
*count = 4
163-
echos[0].BeforeDone(waitForBarrier(count, ready, time.Minute))
164-
165165
for i := 1; i < 5; i++ {
166166
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
167167
if err != nil {
@@ -170,8 +170,14 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
170170
time.Sleep(10 * time.Millisecond)
171171
}
172172

173+
echos[0].BeforeDone(waitForBarrier(4, time.Minute))
174+
175+
var eg sync.WaitGroup
176+
echos[0].Done(eg.Done)
177+
173178
var wg sync.WaitGroup
174179
for i := 1; i < 5; i++ {
180+
eg.Add(1)
175181
wg.Add(1)
176182
go func(i int) {
177183
defer wg.Done()
@@ -183,6 +189,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
183189
}(i)
184190
}
185191
wg.Wait()
192+
eg.Wait()
186193

187194
checkEchoStatus(t, echos[0], EchoStatus{
188195
StreamsIn: 4,
@@ -191,11 +198,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
191198
ResourceServiceErrors: 0,
192199
})
193200

194-
ready = make(chan struct{})
201+
ready := make(chan struct{})
195202
echos[0].BeforeDone(waitForChannel(ready, time.Minute))
196203

197204
var once sync.Once
198205
for i := 0; i < 3; i++ {
206+
eg.Add(1)
199207
wg.Add(1)
200208
go func() {
201209
defer wg.Done()
@@ -210,6 +218,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
210218
}()
211219
}
212220
wg.Wait()
221+
eg.Wait()
213222

214223
checkEchoStatus(t, echos[0], EchoStatus{
215224
StreamsIn: 7,
@@ -219,9 +228,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
219228
})
220229
}
221230

222-
func waitForBarrier(count *int32, ready chan struct{}, timeout time.Duration) func() error {
231+
func waitForBarrier(count int32, timeout time.Duration) func() error {
232+
ready := make(chan struct{})
233+
wait := new(int32)
234+
*wait = count
223235
return func() error {
224-
if atomic.AddInt32(count, -1) == 0 {
236+
if atomic.AddInt32(wait, -1) == 0 {
225237
close(ready)
226238
}
227239

0 commit comments

Comments
 (0)