Skip to content

Commit 88c039a

Browse files
committed
query: fix race condition: redux
this time just move to goprocess
1 parent fc22b79 commit 88c039a

File tree

2 files changed

+39
-37
lines changed

2 files changed

+39
-37
lines changed

routing/dht/ext_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestGetFailures(t *testing.T) {
5252
err = merr[0]
5353
}
5454

55-
if err != context.DeadlineExceeded {
55+
if err != context.DeadlineExceeded && err != context.Canceled {
5656
t.Fatal("Got different error than we expected", err)
5757
}
5858
} else {

routing/dht/query.go

+38-36
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dht
22

33
import (
4-
"error"
4+
"errors"
55
"sync"
66

77
notif "github.com/jbenet/go-ipfs/notifications"
@@ -13,7 +13,8 @@ import (
1313
pset "github.com/jbenet/go-ipfs/util/peerset"
1414
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
1515

16-
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
16+
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
17+
ctxproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
1718
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1819
)
1920

@@ -63,7 +64,7 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
6364
defer cancel()
6465

6566
runner := newQueryRunner(ctx, q)
66-
return runner.Run(peers)
67+
return runner.Run(ctx, peers)
6768
}
6869

6970
type dhtQueryRunner struct {
@@ -78,7 +79,7 @@ type dhtQueryRunner struct {
7879
rateLimit chan struct{} // processing semaphore
7980
log eventlog.EventLogger
8081

81-
cg ctxgroup.ContextGroup
82+
proc process.Process
8283
sync.RWMutex
8384
}
8485

@@ -89,11 +90,11 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
8990
peersRemaining: todoctr.NewSyncCounter(),
9091
peersSeen: pset.New(),
9192
rateLimit: make(chan struct{}, q.concurrency),
92-
cg: ctxgroup.WithContext(ctx),
93+
proc: process.WithParent(process.Background()),
9394
}
9495
}
9596

96-
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
97+
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
9798
r.log = log
9899

99100
if len(peers) == 0 {
@@ -107,31 +108,30 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
107108

108109
// add all the peers we got first.
109110
for _, p := range peers {
110-
r.addPeerToQuery(r.cg.Context(), p)
111-
}
112-
113-
// may be closed already. this caused an odd race (where we attempt to
114-
// add a child to an already closed ctxgroup). this is a temp workaround
115-
// as we'll switch to using a proc here soon.
116-
select {
117-
case <-r.cg.Closed():
118-
return nil, r.cg.Context().Err()
119-
default:
111+
r.addPeerToQuery(p)
120112
}
121113

122114
// go do this thing.
123-
// do it as a child func to make sure Run exits
115+
// do it as a child proc to make sure Run exits
124116
// ONLY AFTER spawn workers has exited.
125-
r.cg.AddChildFunc(r.spawnWorkers)
117+
r.proc.Go(r.spawnWorkers)
126118

127119
// so workers are working.
128120

129121
// wait until they're done.
130122
err := routing.ErrNotFound
131123

124+
// now, if the context finishes, close the proc.
125+
// we have to do it here because the logic before is setup, which
126+
// should run without closing the proc.
127+
go func() {
128+
<-ctx.Done()
129+
r.proc.Close()
130+
}()
131+
132132
select {
133133
case <-r.peersRemaining.Done():
134-
r.cg.Close()
134+
r.proc.Close()
135135
r.RLock()
136136
defer r.RUnlock()
137137

@@ -143,12 +143,10 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
143143
err = r.errs[0]
144144
}
145145

146-
case <-r.cg.Closed():
147-
log.Debug("r.cg.Closed()")
148-
146+
case <-r.proc.Closed():
149147
r.RLock()
150148
defer r.RUnlock()
151-
err = r.cg.Context().Err() // collect the error.
149+
err = context.DeadlineExceeded
152150
}
153151

154152
if r.result != nil && r.result.success {
@@ -158,7 +156,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
158156
return nil, err
159157
}
160158

161-
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
159+
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
162160
// if new peer is ourselves...
163161
if next == r.query.dht.self {
164162
r.log.Debug("addPeerToQuery skip self")
@@ -172,18 +170,18 @@ func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
172170
r.peersRemaining.Increment(1)
173171
select {
174172
case r.peersToQuery.EnqChan <- next:
175-
case <-ctx.Done():
173+
case <-r.proc.Closing():
176174
}
177175
}
178176

179-
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
177+
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
180178
for {
181179

182180
select {
183181
case <-r.peersRemaining.Done():
184182
return
185183

186-
case <-r.cg.Closing():
184+
case <-r.proc.Closing():
187185
return
188186

189187
case p, more := <-r.peersToQuery.DeqChan:
@@ -193,24 +191,27 @@ func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
193191

194192
// do it as a child func to make sure Run exits
195193
// ONLY AFTER spawn workers has exited.
196-
parent.AddChildFunc(func(cg ctxgroup.ContextGroup) {
197-
r.queryPeer(cg, p)
194+
proc.Go(func(proc process.Process) {
195+
r.queryPeer(proc, p)
198196
})
199197
}
200198
}
201199
}
202200

203-
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
201+
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
204202
// make sure we rate limit concurrency.
205203
select {
206204
case <-r.rateLimit:
207-
case <-cg.Closing():
205+
case <-proc.Closing():
208206
r.peersRemaining.Decrement(1)
209207
return
210208
}
211209

212210
// ok let's do this!
213211

212+
// create a context from our proc.
213+
ctx := ctxproc.WithProcessClosing(context.Background(), proc)
214+
214215
// make sure we do this when we exit
215216
defer func() {
216217
// signal we're done proccessing peer p
@@ -227,10 +228,11 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
227228
r.rateLimit <- struct{}{}
228229

229230
pi := peer.PeerInfo{ID: p}
230-
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
231+
232+
if err := r.query.dht.host.Connect(ctx, pi); err != nil {
231233
log.Debugf("Error connecting: %s", err)
232234

233-
notif.PublishQueryEvent(cg.Context(), &notif.QueryEvent{
235+
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
234236
Type: notif.QueryError,
235237
Extra: err.Error(),
236238
})
@@ -246,7 +248,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
246248
}
247249

248250
// finally, run the query against this peer
249-
res, err := r.query.qfunc(cg.Context(), p)
251+
res, err := r.query.qfunc(ctx, p)
250252

251253
if err != nil {
252254
log.Debugf("ERROR worker for: %v %v", p, err)
@@ -259,7 +261,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
259261
r.Lock()
260262
r.result = res
261263
r.Unlock()
262-
go r.cg.Close() // signal to everyone that we're done.
264+
go r.proc.Close() // signal to everyone that we're done.
263265
// must be async, as we're one of the children, and Close blocks.
264266

265267
} else if len(res.closerPeers) > 0 {
@@ -272,7 +274,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
272274

273275
// add their addresses to the dialer's peerstore
274276
r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, peer.TempAddrTTL)
275-
r.addPeerToQuery(cg.Context(), next.ID)
277+
r.addPeerToQuery(next.ID)
276278
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
277279
}
278280
} else {

0 commit comments

Comments
 (0)