Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 91c3c55

Browse files
committed
feat: add peer block filter option
This feature lets a user configure a function that will allow / deny request for a block coming from a peer.
1 parent ada55fc commit 91c3c55

File tree

3 files changed

+107
-2
lines changed

3 files changed

+107
-2
lines changed

bitswap.go

+5
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ func WithTargetMessageSize(tms int) Option {
156156

157157
type TaskInfo = decision.TaskInfo
158158
type TaskComparator = decision.TaskComparator
159+
type PeerBlockRequestFilter = decision.PeerBlockRequestFilter
159160

160161
// WithTaskComparator configures custom task prioritization logic.
161162
func WithTaskComparator(comparator TaskComparator) Option {
@@ -291,6 +292,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
291292
activeBlocksGauge,
292293
decision.WithTaskComparator(bs.taskComparator),
293294
decision.WithTargetMessageSize(bs.engineTargetMessageSize),
295+
decision.WithPeerBlockRequestFilter(bs.peerBlockRequestFilter),
294296
)
295297
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)
296298

@@ -399,6 +401,9 @@ type Bitswap struct {
399401
simulateDontHavesOnTimeout bool
400402

401403
taskComparator TaskComparator
404+
405+
// an optional feature to accept / deny HAVE - DONT HAVE requests
406+
peerBlockRequestFilter PeerBlockRequestFilter
402407
}
403408

404409
type counters struct {

internal/decision/engine.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ type Engine struct {
180180
metricUpdateCounter int
181181

182182
taskComparator TaskComparator
183+
184+
peerBlockRequestFilter PeerBlockRequestFilter
183185
}
184186

185187
// TaskInfo represents the details of a request from a peer.
@@ -201,6 +203,10 @@ type TaskInfo struct {
201203
// It should return true if task 'ta' has higher priority than task 'tb'
202204
type TaskComparator func(ta, tb *TaskInfo) bool
203205

206+
// PeerBlockRequestFilter is used to accept / deny requests for a CID coming from a PeerID
207+
// It should return true if the request should be fullfilled.
208+
type PeerBlockRequestFilter func(p peer.ID, c cid.Cid) bool
209+
204210
type Option func(*Engine)
205211

206212
func WithTaskComparator(comparator TaskComparator) Option {
@@ -209,6 +215,12 @@ func WithTaskComparator(comparator TaskComparator) Option {
209215
}
210216
}
211217

218+
func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option {
219+
return func(e *Engine) {
220+
e.peerBlockRequestFilter = pbrf
221+
}
222+
}
223+
212224
func WithTargetMessageSize(size int) Option {
213225
return func(e *Engine) {
214226
e.targetMessageSize = size
@@ -647,8 +659,14 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
647659
// Add each want-have / want-block to the ledger
648660
l.Wants(c, entry.Priority, entry.WantType)
649661

650-
// If the block was not found
651-
if !found {
662+
// Check if the peer is allowed to retrieve this block
663+
passFilter := true
664+
if e.peerBlockRequestFilter != nil {
665+
passFilter = e.peerBlockRequestFilter(p, c)
666+
}
667+
668+
// If the block was not found or the peer doesn't pass the policy
669+
if !found || !passFilter {
652670
log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)
653671

654672
// Only add the task to the queue if the requester wants a DONT_HAVE

internal/decision/engine_test.go

+82
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,88 @@ func TestTaskComparator(t *testing.T) {
11121112
}
11131113
}
11141114

1115+
func TestPeerBlockFilter(t *testing.T) {
1116+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
1117+
defer cancel()
1118+
1119+
// Generate a few keys
1120+
keys := []string{"a", "b", "c"}
1121+
cids := make(map[cid.Cid]int)
1122+
blks := make([]blocks.Block, 0, len(keys))
1123+
for i, letter := range keys {
1124+
block := blocks.NewBlock([]byte(letter))
1125+
blks = append(blks, block)
1126+
cids[block.Cid()] = i
1127+
}
1128+
1129+
// Generate a few peers
1130+
peerIDs := make([]peer.ID, len(keys))
1131+
for _, i := range cids {
1132+
peerID := libp2ptest.RandPeerIDFatal(t)
1133+
peerIDs[i] = peerID
1134+
}
1135+
1136+
// Setup the peer
1137+
fpt := &fakePeerTagger{}
1138+
sl := NewTestScoreLedger(shortTerm, nil, clock.New())
1139+
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
1140+
if err := bs.PutMany(ctx, blks); err != nil {
1141+
t.Fatal(err)
1142+
}
1143+
1144+
// use a single task worker so that the order of outgoing messages is deterministic
1145+
engineTaskWorkerCount := 1
1146+
e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl,
1147+
// if this Option is omitted, the test fails
1148+
WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool {
1149+
// peer 0 has access to everything
1150+
if p == peerIDs[0] {
1151+
return true
1152+
}
1153+
// peer 1 has access to key b and c
1154+
if p == peerIDs[1] {
1155+
return blks[1].Cid().Equals(c) || blks[2].Cid().Equals(c)
1156+
}
1157+
// peer 2 and other have access to key c
1158+
return blks[2].Cid().Equals(c)
1159+
}),
1160+
)
1161+
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
1162+
1163+
// Create wants requests
1164+
for _, peerID := range peerIDs {
1165+
partnerWantBlocks(e, keys, peerID)
1166+
}
1167+
1168+
// check that outgoing messages are sent with the correct content
1169+
checkPeer := func(peerIndex int, expectedBlocks []blocks.Block) {
1170+
next := <-e.Outbox()
1171+
envelope := <-next
1172+
1173+
peerID := peerIDs[peerIndex]
1174+
responseBlocks := envelope.Message.Blocks()
1175+
1176+
if peerID != envelope.Peer {
1177+
t.Errorf("(Peer%v) expected message for peer ID %#v but instead got message for peer ID %#v", peerIndex, peerID, envelope.Peer)
1178+
}
1179+
1180+
if len(responseBlocks) != len(expectedBlocks) {
1181+
t.Errorf("(Peer%v) expected %v block in response but instead got %v", peerIndex, len(expectedBlocks), len(responseBlocks))
1182+
}
1183+
1184+
// TODO: figure out how to make this test deterministic (sort or use a set?)
1185+
// for i, expectedBlock := range expectedBlocks {
1186+
// if responseBlocks[i].Cid() != expectedBlock.Cid() {
1187+
// t.Errorf("(Peer%v) expected block with CID %#v but instead got block with CID %#v", peerIndex, expectedBlock.Cid(), responseBlocks[i].Cid())
1188+
// }
1189+
// }
1190+
}
1191+
1192+
checkPeer(0, blks[0:3])
1193+
checkPeer(1, blks[1:3])
1194+
checkPeer(2, blks[2:3])
1195+
}
1196+
11151197
func TestTaggingPeers(t *testing.T) {
11161198
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
11171199
defer cancel()

0 commit comments

Comments
 (0)