2
2
package decision
3
3
4
4
import (
5
+ "cmp"
5
6
"context"
7
+ "errors"
6
8
"fmt"
7
- "math/bits "
9
+ "slices "
8
10
"sync"
9
11
"time"
10
12
11
13
"github.com/google/uuid"
12
-
13
14
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
14
15
"github.com/ipfs/boxo/bitswap/internal/defaults"
15
16
bsmsg "github.com/ipfs/boxo/bitswap/message"
@@ -132,9 +133,11 @@ type PeerEntry struct {
132
133
// PeerLedger is an external ledger dealing with peers and their want lists.
133
134
type PeerLedger interface {
134
135
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
135
- Wants (p peer.ID , e wl.Entry )
136
+ // If peer ledger exceed internal limit, then the entry is not added
137
+ // and false is returned.
138
+ Wants (p peer.ID , e wl.Entry ) bool
136
139
137
- // CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
140
+ // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
138
141
CancelWant (p peer.ID , k cid.Cid ) bool
139
142
140
143
// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
@@ -315,8 +318,11 @@ func WithMaxOutstandingBytesPerPeer(count int) Option {
315
318
}
316
319
}
317
320
318
- // WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
319
- // If a peer send us more than this we will truncate newest entries.
321
+ // WithMaxQueuedWantlistEntriesPerPeer limits how many individual entries each
322
+ // peer is allowed to send. If a peer sends more than this, then the lowest
323
+ // priority entries are truncated to this limit. If there is insufficient space
324
+ // to enqueue new entries, then older existing wants with no associated blocks,
325
+ // and lower priority wants, are canceled to make room for the new wants.
320
326
func WithMaxQueuedWantlistEntriesPerPeer (count uint ) Option {
321
327
return func (e * Engine ) {
322
328
e .maxQueuedWantlistEntriesPerPeer = count
@@ -402,7 +408,6 @@ func newEngine(
402
408
taskWorkerCount : defaults .BitswapEngineTaskWorkerCount ,
403
409
sendDontHaves : true ,
404
410
self : self ,
405
- peerLedger : NewDefaultPeerLedger (),
406
411
pendingGauge : bmetrics .PendingEngineGauge (ctx ),
407
412
activeGauge : bmetrics .ActiveEngineGauge (ctx ),
408
413
targetMessageSize : defaultTargetMessageSize ,
@@ -416,6 +421,11 @@ func newEngine(
416
421
opt (e )
417
422
}
418
423
424
+ // If peerLedger was not set by option, then create a default instance.
425
+ if e .peerLedger == nil {
426
+ e .peerLedger = NewDefaultPeerLedger (e .maxQueuedWantlistEntriesPerPeer )
427
+ }
428
+
419
429
e .bsm = newBlockstoreManager (bs , e .bstoreWorkerCount , bmetrics .PendingBlocksGauge (ctx ), bmetrics .ActiveBlocksGauge (ctx ))
420
430
421
431
// default peer task queue options
@@ -676,14 +686,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
676
686
return false
677
687
}
678
688
679
- newWorkExists := false
680
- defer func () {
681
- if newWorkExists {
682
- e .signalNewWork ()
683
- }
684
- }()
685
-
686
- wants , cancels , denials := e .splitWantsCancelsDenials (p , m )
689
+ wants , cancels , denials , err := e .splitWantsCancelsDenials (p , m )
690
+ if err != nil {
691
+ // This is a truely broken client, let's kill the connection.
692
+ log .Warnw (err .Error (), "local" , e .self , "remote" , p )
693
+ return true
694
+ }
687
695
688
696
// Get block sizes
689
697
wantKs := cid .NewSet ()
@@ -702,56 +710,35 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
702
710
e .peerLedger .ClearPeerWantlist (p )
703
711
}
704
712
705
- s := uint (e .peerLedger .WantlistSizeForPeer (p ))
706
- if wouldBe := s + uint (len (wants )); wouldBe > e .maxQueuedWantlistEntriesPerPeer {
707
- log .Debugw ("wantlist overflow" , "local" , e .self , "remote" , p , "would be" , wouldBe )
708
- // truncate wantlist to avoid overflow
709
- available , o := bits .Sub (e .maxQueuedWantlistEntriesPerPeer , s , 0 )
710
- if o != 0 {
711
- available = 0
713
+ var overflow []bsmsg.Entry
714
+ if len (wants ) != 0 {
715
+ filteredWants := wants [:0 ] // shift inplace
716
+ for _ , entry := range wants {
717
+ if ! e .peerLedger .Wants (p , entry .Entry ) {
718
+ // Cannot add entry because it would exceed size limit.
719
+ overflow = append (overflow , entry )
720
+ continue
721
+ }
722
+ filteredWants = append (filteredWants , entry )
712
723
}
713
- wants = wants [:available ]
724
+ // Clear truncated entries - early GC.
725
+ clear (wants [len (filteredWants ):])
726
+ wants = filteredWants
714
727
}
715
728
716
- filteredWants := wants [:0 ] // shift inplace
717
-
718
- for _ , entry := range wants {
719
- if entry .Cid .Prefix ().MhType == mh .IDENTITY {
720
- // This is a truely broken client, let's kill the connection.
721
- e .lock .Unlock ()
722
- log .Warnw ("peer wants an identity CID" , "local" , e .self , "remote" , p )
723
- return true
724
- }
725
- if e .maxCidSize != 0 && uint (entry .Cid .ByteLen ()) > e .maxCidSize {
726
- // Ignore requests about CIDs that big.
727
- continue
728
- }
729
-
730
- e .peerLedger .Wants (p , entry .Entry )
731
- filteredWants = append (filteredWants , entry )
729
+ if len (overflow ) != 0 {
730
+ log .Infow ("handling wantlist overflow" , "local" , e .self , "from" , p , "wantlistSize" , len (wants ), "overflowSize" , len (overflow ))
731
+ wants = e .handleOverflow (ctx , p , overflow , wants )
732
732
}
733
- // Clear truncated entries - early GC.
734
- clear (wants [len (filteredWants ):])
735
733
736
- wants = filteredWants
737
734
for _ , entry := range cancels {
738
735
c := entry .Cid
739
- if c .Prefix ().MhType == mh .IDENTITY {
740
- // This is a truely broken client, let's kill the connection.
741
- e .lock .Unlock ()
742
- log .Warnw ("peer canceled an identity CID" , "local" , e .self , "remote" , p )
743
- return true
744
- }
745
- if e .maxCidSize != 0 && uint (c .ByteLen ()) > e .maxCidSize {
746
- // Ignore requests about CIDs that big.
747
- continue
748
- }
749
-
750
736
log .Debugw ("Bitswap engine <- cancel" , "local" , e .self , "from" , p , "cid" , c )
751
737
if e .peerLedger .CancelWant (p , c ) {
752
738
e .peerRequestQueue .Remove (c , p )
753
739
}
754
740
}
741
+
755
742
e .lock .Unlock ()
756
743
757
744
var activeEntries []peertask.Task
@@ -761,21 +748,14 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
761
748
// Only add the task to the queue if the requester wants a DONT_HAVE
762
749
if e .sendDontHaves && entry .SendDontHave {
763
750
c := entry .Cid
764
-
765
- newWorkExists = true
766
- isWantBlock := false
767
- if entry .WantType == pb .Message_Wantlist_Block {
768
- isWantBlock = true
769
- }
770
-
771
751
activeEntries = append (activeEntries , peertask.Task {
772
752
Topic : c ,
773
753
Priority : int (entry .Priority ),
774
754
Work : bsmsg .BlockPresenceSize (c ),
775
755
Data : & taskData {
776
756
BlockSize : 0 ,
777
757
HaveBlock : false ,
778
- IsWantBlock : isWantBlock ,
758
+ IsWantBlock : entry . WantType == pb . Message_Wantlist_Block ,
779
759
SendDontHave : entry .SendDontHave ,
780
760
},
781
761
})
@@ -800,8 +780,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
800
780
continue
801
781
}
802
782
// The block was found, add it to the queue
803
- newWorkExists = true
804
-
805
783
isWantBlock := e .sendAsBlock (entry .WantType , blockSize )
806
784
807
785
log .Debugw ("Bitswap engine: block found" , "local" , e .self , "from" , p , "cid" , c , "isWantBlock" , isWantBlock )
@@ -827,19 +805,96 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
827
805
})
828
806
}
829
807
830
- // Push entries onto the request queue
831
- if len (activeEntries ) > 0 {
808
+ // Push entries onto the request queue and signal network that new work is ready.
809
+ if len (activeEntries ) != 0 {
832
810
e .peerRequestQueue .PushTasksTruncated (e .maxQueuedWantlistEntriesPerPeer , p , activeEntries ... )
833
811
e .updateMetrics ()
812
+ e .signalNewWork ()
834
813
}
835
814
return false
836
815
}
837
816
817
+ // handleOverflow processes incoming wants that could not be addded to the peer
818
+ // ledger without exceeding the peer want limit. These are handled by trying to
819
+ // make room by canceling existing wants for which there is no block. If this
820
+ // does not make sufficient room, then any lower priority wants that have
821
+ // blocks are canceled.
822
+ //
823
+ // Important: handleOverflwo must be called e.lock is locked.
824
+ func (e * Engine ) handleOverflow (ctx context.Context , p peer.ID , overflow , wants []bsmsg.Entry ) []bsmsg.Entry {
825
+ // Sort overflow from most to least important.
826
+ slices .SortFunc (overflow , func (a , b bsmsg.Entry ) int {
827
+ return cmp .Compare (b .Entry .Priority , a .Entry .Priority )
828
+ })
829
+ // Sort existing wants from least to most important, to try to replace
830
+ // lowest priority items first.
831
+ existingWants := e .peerLedger .WantlistForPeer (p )
832
+ slices .SortFunc (existingWants , func (a , b wl.Entry ) int {
833
+ return cmp .Compare (b .Priority , a .Priority )
834
+ })
835
+
836
+ queuedWantKs := cid .NewSet ()
837
+ for _ , entry := range existingWants {
838
+ queuedWantKs .Add (entry .Cid )
839
+ }
840
+ queuedBlockSizes , err := e .bsm .getBlockSizes (ctx , queuedWantKs .Keys ())
841
+ if err != nil {
842
+ log .Info ("aborting overflow processing" , err )
843
+ return wants
844
+ }
845
+
846
+ // Remove entries for blocks that are not present to make room for overflow.
847
+ var removed []int
848
+ for i , w := range existingWants {
849
+ if _ , found := queuedBlockSizes [w .Cid ]; ! found {
850
+ // Cancel lowest priority dont-have.
851
+ if e .peerLedger .CancelWant (p , w .Cid ) {
852
+ e .peerRequestQueue .Remove (w .Cid , p )
853
+ }
854
+ removed = append (removed , i )
855
+ // Pop hoghest priority overflow.
856
+ firstOver := overflow [0 ]
857
+ overflow = overflow [1 :]
858
+ // Add highest priority overflow to wants.
859
+ e .peerLedger .Wants (p , firstOver .Entry )
860
+ wants = append (wants , firstOver )
861
+ if len (overflow ) == 0 {
862
+ return wants
863
+ }
864
+ }
865
+ }
866
+
867
+ // Replace existing entries, that are a lower priority, with overflow
868
+ // entries.
869
+ var replace int
870
+ for _ , overflowEnt := range overflow {
871
+ // Do not compare with removed existingWants entry.
872
+ for len (removed ) != 0 && replace == removed [0 ] {
873
+ replace ++
874
+ removed = removed [1 :]
875
+ }
876
+ if overflowEnt .Entry .Priority < existingWants [replace ].Priority {
877
+ // All overflow entries have too low of priority to replace any
878
+ // existing wants.
879
+ break
880
+ }
881
+ entCid := existingWants [replace ].Cid
882
+ replace ++
883
+ if e .peerLedger .CancelWant (p , entCid ) {
884
+ e .peerRequestQueue .Remove (entCid , p )
885
+ }
886
+ e .peerLedger .Wants (p , overflowEnt .Entry )
887
+ wants = append (wants , overflowEnt )
888
+ }
889
+
890
+ return wants
891
+ }
892
+
838
893
// Split the want-havek entries from the cancel and deny entries.
839
- func (e * Engine ) splitWantsCancelsDenials (p peer.ID , m bsmsg.BitSwapMessage ) ([]bsmsg.Entry , []bsmsg.Entry , []bsmsg.Entry ) {
894
+ func (e * Engine ) splitWantsCancelsDenials (p peer.ID , m bsmsg.BitSwapMessage ) ([]bsmsg.Entry , []bsmsg.Entry , []bsmsg.Entry , error ) {
840
895
entries := m .Wantlist () // creates copy; safe to modify
841
896
if len (entries ) == 0 {
842
- return nil , nil , nil
897
+ return nil , nil , nil , nil
843
898
}
844
899
845
900
log .Debugw ("Bitswap engine <- msg" , "local" , e .self , "from" , p , "entryCount" , len (entries ))
@@ -848,23 +903,35 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
848
903
var cancels , denials []bsmsg.Entry
849
904
850
905
for _ , et := range entries {
906
+ c := et .Cid
907
+ if e .maxCidSize != 0 && uint (c .ByteLen ()) > e .maxCidSize {
908
+ // Ignore requests about CIDs that big.
909
+ continue
910
+ }
911
+ if c .Prefix ().MhType == mh .IDENTITY {
912
+ return nil , nil , nil , errors .New ("peer canceled an identity CID" )
913
+ }
914
+
851
915
if et .Cancel {
852
916
cancels = append (cancels , et )
853
917
continue
854
918
}
855
919
856
920
if et .WantType == pb .Message_Wantlist_Have {
857
- log .Debugw ("Bitswap engine <- want-have" , "local" , e .self , "from" , p , "cid" , et . Cid )
921
+ log .Debugw ("Bitswap engine <- want-have" , "local" , e .self , "from" , p , "cid" , c )
858
922
} else {
859
- log .Debugw ("Bitswap engine <- want-block" , "local" , e .self , "from" , p , "cid" , et . Cid )
923
+ log .Debugw ("Bitswap engine <- want-block" , "local" , e .self , "from" , p , "cid" , c )
860
924
}
861
925
862
- if e .peerBlockRequestFilter != nil && ! e .peerBlockRequestFilter (p , et . Cid ) {
926
+ if e .peerBlockRequestFilter != nil && ! e .peerBlockRequestFilter (p , c ) {
863
927
denials = append (denials , et )
864
928
continue
865
929
}
866
930
867
- wants = append (wants , et )
931
+ // Do not take more wants that can be handled.
932
+ if len (wants ) < int (e .maxQueuedWantlistEntriesPerPeer ) {
933
+ wants = append (wants , et )
934
+ }
868
935
}
869
936
870
937
if len (wants ) == 0 {
@@ -874,7 +941,7 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
874
941
// Clear truncated entries.
875
942
clear (entries [len (wants ):])
876
943
877
- return wants , cancels , denials
944
+ return wants , cancels , denials , nil
878
945
}
879
946
880
947
// ReceivedBlocks is called when new blocks are received from the network.
0 commit comments