@@ -5,12 +5,13 @@ package bitswap
5
5
import (
6
6
"context"
7
7
"errors"
8
+
8
9
"sync"
9
10
"time"
10
11
11
- bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
12
12
delay "github.com/ipfs/go-ipfs-delay"
13
13
14
+ bsbpm "github.com/ipfs/go-bitswap/blockpresencemanager"
14
15
decision "github.com/ipfs/go-bitswap/decision"
15
16
bsgetter "github.com/ipfs/go-bitswap/getter"
16
17
bsmsg "github.com/ipfs/go-bitswap/message"
@@ -20,6 +21,7 @@ import (
20
21
bspm "github.com/ipfs/go-bitswap/peermanager"
21
22
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
22
23
bssession "github.com/ipfs/go-bitswap/session"
24
+ bssim "github.com/ipfs/go-bitswap/sessioninterestmanager"
23
25
bssm "github.com/ipfs/go-bitswap/sessionmanager"
24
26
bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
25
27
bswm "github.com/ipfs/go-bitswap/wantmanager"
@@ -113,24 +115,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
113
115
return bsmq .New (ctx , p , network )
114
116
}
115
117
116
- wm := bswm .New (ctx , bspm .New (ctx , peerQueueFactory ))
118
+ sim := bssim .New ()
119
+ bpm := bsbpm .New ()
120
+ pm := bspm .New (ctx , peerQueueFactory , network .Self ())
121
+ wm := bswm .New (ctx , pm , sim , bpm )
117
122
pqm := bspqm .New (ctx , network )
118
123
119
- sessionFactory := func (ctx context.Context , id uint64 , pm bssession.PeerManager , srs bssession.RequestSplitter ,
124
+ sessionFactory := func (ctx context.Context , id uint64 , spm bssession.SessionPeerManager ,
125
+ sim * bssim.SessionInterestManager ,
126
+ pm bssession.PeerManager ,
127
+ bpm * bsbpm.BlockPresenceManager ,
120
128
notif notifications.PubSub ,
121
129
provSearchDelay time.Duration ,
122
- rebroadcastDelay delay.D ) bssm.Session {
123
- return bssession .New (ctx , id , wm , pm , srs , notif , provSearchDelay , rebroadcastDelay )
130
+ rebroadcastDelay delay.D ,
131
+ self peer.ID ) bssm.Session {
132
+ return bssession .New (ctx , id , wm , spm , sim , pm , bpm , notif , provSearchDelay , rebroadcastDelay , self )
124
133
}
125
- sessionPeerManagerFactory := func (ctx context.Context , id uint64 ) bssession.PeerManager {
134
+ sessionPeerManagerFactory := func (ctx context.Context , id uint64 ) bssession.SessionPeerManager {
126
135
return bsspm .New (ctx , id , network .ConnectionManager (), pqm )
127
136
}
128
- sessionRequestSplitterFactory := func (ctx context.Context ) bssession.RequestSplitter {
129
- return bssrs .New (ctx )
130
- }
131
137
notif := notifications .New ()
138
+ sm := bssm .New (ctx , sessionFactory , sim , sessionPeerManagerFactory , bpm , pm , notif , network .Self ())
139
+ wm .SetSessionManager (sm )
140
+ engine := decision .NewEngine (ctx , bstore , network .ConnectionManager (), network .Self ())
132
141
133
- engine := decision .NewEngine (ctx , bstore , network .ConnectionManager ()) // TODO close the engine with Close() method
134
142
bs := & Bitswap {
135
143
blockstore : bstore ,
136
144
engine : engine ,
@@ -139,8 +147,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
139
147
newBlocks : make (chan cid.Cid , HasBlockBufferSize ),
140
148
provideKeys : make (chan cid.Cid , provideKeysBufferSize ),
141
149
wm : wm ,
150
+ pm : pm ,
142
151
pqm : pqm ,
143
- sm : bssm .New (ctx , sessionFactory , sessionPeerManagerFactory , sessionRequestSplitterFactory , notif ),
152
+ sm : sm ,
153
+ sim : sim ,
144
154
notif : notif ,
145
155
counters : new (counters ),
146
156
dupMetric : dupHist ,
@@ -156,7 +166,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
156
166
option (bs )
157
167
}
158
168
159
- bs .wm .Startup ()
160
169
bs .pqm .Startup ()
161
170
network .SetDelegate (bs )
162
171
@@ -181,6 +190,8 @@ type Bitswap struct {
181
190
// the wantlist tracks global wants for bitswap
182
191
wm * bswm.WantManager
183
192
193
+ pm * bspm.PeerManager
194
+
184
195
// the provider query manager manages requests to find providers
185
196
pqm * bspqm.ProviderQueryManager
186
197
@@ -215,9 +226,13 @@ type Bitswap struct {
215
226
allMetric metrics.Histogram
216
227
sentHistogram metrics.Histogram
217
228
218
- // the sessionmanager manages tracking sessions
229
+ // the SessionManager routes requests to interested sessions
219
230
sm * bssm.SessionManager
220
231
232
+ // the SessionInterestManager keeps track of which sessions are interested
233
+ // in which CIDs
234
+ sim * bssim.SessionInterestManager
235
+
221
236
// whether or not to make provide announcements
222
237
provideEnabled bool
223
238
@@ -275,14 +290,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
275
290
// HasBlock announces the existence of a block to this bitswap service. The
276
291
// service will potentially notify its peers.
277
292
func (bs * Bitswap ) HasBlock (blk blocks.Block ) error {
278
- return bs .receiveBlocksFrom (context .Background (), "" , []blocks.Block {blk })
293
+ return bs .receiveBlocksFrom (context .Background (), "" , []blocks.Block {blk }, nil , nil )
279
294
}
280
295
281
296
// TODO: Some of this stuff really only needs to be done when adding a block
282
297
// from the user, not when receiving it from the network.
283
298
// In case you run `git blame` on this comment, I'll save you some time: ask
284
299
// @whyrusleeping, I don't know the answers you seek.
285
- func (bs * Bitswap ) receiveBlocksFrom (ctx context.Context , from peer.ID , blks []blocks.Block ) error {
300
+ func (bs * Bitswap ) receiveBlocksFrom (ctx context.Context , from peer.ID , blks []blocks.Block , haves []cid. Cid , dontHaves []cid. Cid ) error {
286
301
select {
287
302
case <- bs .process .Closing ():
288
303
return errors .New ("bitswap is closed" )
@@ -293,22 +308,20 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
293
308
294
309
// If blocks came from the network
295
310
if from != "" {
296
- // Split blocks into wanted blocks vs duplicates
297
- wanted = make ([]blocks.Block , 0 , len (blks ))
298
- for _ , b := range blks {
299
- if bs .sm .IsWanted (b .Cid ()) {
300
- wanted = append (wanted , b )
301
- } else {
302
- log .Debugf ("[recv] block not in wantlist; cid=%s, peer=%s" , b .Cid (), from )
303
- }
311
+ var notWanted []blocks.Block
312
+ wanted , notWanted = bs .sim .SplitWantedUnwanted (blks )
313
+ for _ , b := range notWanted {
314
+ log .Debugf ("[recv] block not in wantlist; cid=%s, peer=%s" , b .Cid (), from )
304
315
}
305
316
}
306
317
307
318
// Put wanted blocks into blockstore
308
- err := bs .blockstore .PutMany (wanted )
309
- if err != nil {
310
- log .Errorf ("Error writing %d blocks to datastore: %s" , len (wanted ), err )
311
- return err
319
+ if len (wanted ) > 0 {
320
+ err := bs .blockstore .PutMany (wanted )
321
+ if err != nil {
322
+ log .Errorf ("Error writing %d blocks to datastore: %s" , len (wanted ), err )
323
+ return err
324
+ }
312
325
}
313
326
314
327
// NOTE: There exists the possiblity for a race condition here. If a user
@@ -322,33 +335,25 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
322
335
allKs = append (allKs , b .Cid ())
323
336
}
324
337
325
- wantedKs := allKs
326
- if len (blks ) != len (wanted ) {
327
- wantedKs = make ([]cid.Cid , 0 , len (wanted ))
328
- for _ , b := range wanted {
329
- wantedKs = append (wantedKs , b .Cid ())
330
- }
331
- }
332
-
333
338
// Send all block keys (including duplicates) to any sessions that want them.
334
339
// (The duplicates are needed by sessions for accounting purposes)
335
- bs .sm .ReceiveFrom (from , allKs )
340
+ bs .wm .ReceiveFrom (ctx , from , allKs , haves , dontHaves )
336
341
337
- // Send wanted block keys to decision engine
338
- bs .engine .AddBlocks ( wantedKs )
342
+ // Send wanted blocks to decision engine
343
+ bs .engine .ReceiveFrom ( from , wanted , haves )
339
344
340
345
// Publish the block to any Bitswap clients that had requested blocks.
341
- // (the sessions use this pubsub mechanism to inform clients of received
346
+ // (the sessions use this pubsub mechanism to inform clients of incoming
342
347
// blocks)
343
348
for _ , b := range wanted {
344
349
bs .notif .Publish (b )
345
350
}
346
351
347
352
// If the reprovider is enabled, send wanted blocks to reprovider
348
353
if bs .provideEnabled {
349
- for _ , k := range wantedKs {
354
+ for _ , blk := range wanted {
350
355
select {
351
- case bs .newBlocks <- k :
356
+ case bs .newBlocks <- blk . Cid () :
352
357
// send block off to be reprovided
353
358
case <- bs .process .Closing ():
354
359
return bs .process .Close ()
@@ -380,20 +385,22 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
380
385
381
386
iblocks := incoming .Blocks ()
382
387
383
- if len (iblocks ) == 0 {
384
- return
385
- }
386
-
387
- bs .updateReceiveCounters (iblocks )
388
- for _ , b := range iblocks {
389
- log .Debugf ("[recv] block; cid=%s, peer=%s" , b .Cid (), p )
388
+ if len (iblocks ) > 0 {
389
+ bs .updateReceiveCounters (iblocks )
390
+ for _ , b := range iblocks {
391
+ log .Debugf ("[recv] block; cid=%s, peer=%s" , b .Cid (), p )
392
+ }
390
393
}
391
394
392
- // Process blocks
393
- err := bs .receiveBlocksFrom (ctx , p , iblocks )
394
- if err != nil {
395
- log .Warningf ("ReceiveMessage recvBlockFrom error: %s" , err )
396
- return
395
+ haves := incoming .Haves ()
396
+ dontHaves := incoming .DontHaves ()
397
+ if len (iblocks ) > 0 || len (haves ) > 0 || len (dontHaves ) > 0 {
398
+ // Process blocks
399
+ err := bs .receiveBlocksFrom (ctx , p , iblocks , haves , dontHaves )
400
+ if err != nil {
401
+ log .Warningf ("ReceiveMessage recvBlockFrom error: %s" , err )
402
+ return
403
+ }
397
404
}
398
405
}
399
406
@@ -479,12 +486,12 @@ func (bs *Bitswap) Close() error {
479
486
480
487
// GetWantlist returns the current local wantlist.
481
488
func (bs * Bitswap ) GetWantlist () []cid.Cid {
482
- entries := bs .wm .CurrentWants ()
483
- out := make ([]cid. Cid , 0 , len ( entries ))
484
- for _ , e := range entries {
485
- out = append ( out , e . Cid )
486
- }
487
- return out
489
+ return bs .pm .CurrentWants ()
490
+ }
491
+
492
+ // GetWanthaves returns the current list of want-haves.
493
+ func ( bs * Bitswap ) GetWantHaves () []cid. Cid {
494
+ return bs . pm . CurrentWantHaves ()
488
495
}
489
496
490
497
// IsOnline is needed to match go-ipfs-exchange-interface
0 commit comments