Skip to content

Commit c2bbca0

Browse files
committed
query: fix race condition: redux
this time just move to goprocess
1 parent 7a03d67 commit c2bbca0

File tree

2 files changed

+42
-38
lines changed

2 files changed

+42
-38
lines changed

routing/dht/ext_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestGetFailures(t *testing.T) {
5252
err = merr[0]
5353
}
5454

55-
if err != context.DeadlineExceeded {
55+
if err != context.DeadlineExceeded && err != context.Canceled {
5656
t.Fatal("Got different error than we expected", err)
5757
}
5858
} else {

routing/dht/query.go

+41-37
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
pset "github.com/jbenet/go-ipfs/util/peerset"
1313
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
1414

15-
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
15+
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
16+
ctxproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
1617
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
1718
)
1819

@@ -61,8 +62,8 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
6162
ctx, cancel := context.WithCancel(ctx)
6263
defer cancel()
6364

64-
runner := newQueryRunner(ctx, q)
65-
return runner.Run(peers)
65+
runner := newQueryRunner(q)
66+
return runner.Run(ctx, peers)
6667
}
6768

6869
type dhtQueryRunner struct {
@@ -77,22 +78,24 @@ type dhtQueryRunner struct {
7778
rateLimit chan struct{} // processing semaphore
7879
log eventlog.EventLogger
7980

80-
cg ctxgroup.ContextGroup
81+
proc process.Process
8182
sync.RWMutex
8283
}
8384

84-
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
85+
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
86+
proc := process.WithParent(process.Background())
87+
ctx := ctxproc.WithProcessClosing(context.Background(), proc)
8588
return &dhtQueryRunner{
8689
query: q,
8790
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
8891
peersRemaining: todoctr.NewSyncCounter(),
8992
peersSeen: pset.New(),
9093
rateLimit: make(chan struct{}, q.concurrency),
91-
cg: ctxgroup.WithContext(ctx),
94+
proc: proc,
9295
}
9396
}
9497

95-
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
98+
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
9699
r.log = log
97100

98101
if len(peers) == 0 {
@@ -107,31 +110,30 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
107110

108111
// add all the peers we got first.
109112
for _, p := range peers {
110-
r.addPeerToQuery(r.cg.Context(), p)
111-
}
112-
113-
// may be closed already. this caused an odd race (where we attempt to
114-
// add a child to an already closed ctxgroup). this is a temp workaround
115-
// as we'll switch to using a proc here soon.
116-
select {
117-
case <-r.cg.Closed():
118-
return nil, r.cg.Context().Err()
119-
default:
113+
r.addPeerToQuery(p)
120114
}
121115

122116
// go do this thing.
123-
// do it as a child func to make sure Run exits
117+
// do it as a child proc to make sure Run exits
124118
// ONLY AFTER spawn workers has exited.
125-
r.cg.AddChildFunc(r.spawnWorkers)
119+
r.proc.Go(r.spawnWorkers)
126120

127121
// so workers are working.
128122

129123
// wait until they're done.
130124
err := routing.ErrNotFound
131125

126+
// now, if the context finishes, close the proc.
127+
// we have to do it here because the logic before is setup, which
128+
// should run without closing the proc.
129+
go func() {
130+
<-ctx.Done()
131+
r.proc.Close()
132+
}()
133+
132134
select {
133135
case <-r.peersRemaining.Done():
134-
r.cg.Close()
136+
r.proc.Close()
135137
r.RLock()
136138
defer r.RUnlock()
137139

@@ -143,12 +145,10 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
143145
err = r.errs[0]
144146
}
145147

146-
case <-r.cg.Closed():
147-
log.Debug("r.cg.Closed()")
148-
148+
case <-r.proc.Closed():
149149
r.RLock()
150150
defer r.RUnlock()
151-
err = r.cg.Context().Err() // collect the error.
151+
err = context.DeadlineExceeded
152152
}
153153

154154
if r.result != nil && r.result.success {
@@ -158,7 +158,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
158158
return nil, err
159159
}
160160

161-
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
161+
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
162162
// if new peer is ourselves...
163163
if next == r.query.dht.self {
164164
r.log.Debug("addPeerToQuery skip self")
@@ -172,18 +172,18 @@ func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
172172
r.peersRemaining.Increment(1)
173173
select {
174174
case r.peersToQuery.EnqChan <- next:
175-
case <-ctx.Done():
175+
case <-r.proc.Closing():
176176
}
177177
}
178178

179-
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
179+
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
180180
for {
181181

182182
select {
183183
case <-r.peersRemaining.Done():
184184
return
185185

186-
case <-r.cg.Closing():
186+
case <-r.proc.Closing():
187187
return
188188

189189
case p, more := <-r.peersToQuery.DeqChan:
@@ -193,24 +193,27 @@ func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
193193

194194
// do it as a child func to make sure Run exits
195195
// ONLY AFTER spawn workers has exited.
196-
parent.AddChildFunc(func(cg ctxgroup.ContextGroup) {
197-
r.queryPeer(cg, p)
196+
proc.Go(func(proc process.Process) {
197+
r.queryPeer(proc, p)
198198
})
199199
}
200200
}
201201
}
202202

203-
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
203+
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
204204
// make sure we rate limit concurrency.
205205
select {
206206
case <-r.rateLimit:
207-
case <-cg.Closing():
207+
case <-proc.Closing():
208208
r.peersRemaining.Decrement(1)
209209
return
210210
}
211211

212212
// ok let's do this!
213213

214+
// create a context from our proc.
215+
ctx := ctxproc.WithProcessClosing(context.Background(), proc)
216+
214217
// make sure we do this when we exit
215218
defer func() {
216219
// signal we're done proccessing peer p
@@ -227,10 +230,11 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
227230
r.rateLimit <- struct{}{}
228231

229232
pi := peer.PeerInfo{ID: p}
230-
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
233+
234+
if err := r.query.dht.host.Connect(ctx, pi); err != nil {
231235
log.Debugf("Error connecting: %s", err)
232236

233-
notif.PublishQueryEvent(cg.Context(), &notif.QueryEvent{
237+
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
234238
Type: notif.QueryError,
235239
Extra: err.Error(),
236240
})
@@ -246,7 +250,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
246250
}
247251

248252
// finally, run the query against this peer
249-
res, err := r.query.qfunc(cg.Context(), p)
253+
res, err := r.query.qfunc(ctx, p)
250254

251255
if err != nil {
252256
log.Debugf("ERROR worker for: %v %v", p, err)
@@ -259,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
259263
r.Lock()
260264
r.result = res
261265
r.Unlock()
262-
go r.cg.Close() // signal to everyone that we're done.
266+
go r.proc.Close() // signal to everyone that we're done.
263267
// must be async, as we're one of the children, and Close blocks.
264268

265269
} else if len(res.closerPeers) > 0 {
@@ -272,7 +276,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
272276

273277
// add their addresses to the dialer's peerstore
274278
r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, peer.TempAddrTTL)
275-
r.addPeerToQuery(cg.Context(), next.ID)
279+
r.addPeerToQuery(next.ID)
276280
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
277281
}
278282
} else {

0 commit comments

Comments
 (0)