Skip to content

feat(bitswap): add option to disable Bitswap server #911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 30, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The following emojis are used to highlight certain changes:
- `fetcher/impl/blockservice`: new option `SkipNotFound` for the IPLD fetcher. It will skip not found nodes when traversing the DAG. This allows offline traversal of DAGs when using, for example, an offline blockservice.
- This enables use case of providing lazy-loaded, partially local DAGs (like `ipfs files` in Kubo's MFS implementation, see [kubo#10386](https://github.com/ipfs/kubo/issues/10386))
- `gateway`: generated HTML with UnixFS directory listings now include a button for copying CIDs of child entities [#899](https://github.com/ipfs/boxo/pull/899)
- `bitswap/server`: Add ability to enable/disable bitswap server using `WithServerEnabled` bitswap option (#911)[https://github.com/ipfs/boxo/pull/911]

### Changed

Expand Down
74 changes: 50 additions & 24 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ type Bitswap struct {
*client.Client
*server.Server

tracer tracer.Tracer
net network.BitSwapNetwork
tracer tracer.Tracer
net network.BitSwapNetwork
serverEnabled bool
}

func New(ctx context.Context, net network.BitSwapNetwork, providerFinder routing.ContentDiscovery, bstore blockstore.Blockstore, options ...Option) *Bitswap {
bs := &Bitswap{
net: net,
net: net,
serverEnabled: true,
}

var serverOptions []server.Option
Expand All @@ -83,19 +85,24 @@ func New(ctx context.Context, net network.BitSwapNetwork, providerFinder routing
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
bs.Client = client.New(ctx, net, providerFinder, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
if bs.serverEnabled {
bs.Server = server.New(ctx, net, bstore, serverOptions...)
clientOptions = append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))
}
bs.Client = client.New(ctx, net, providerFinder, bstore, clientOptions...)
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once

return bs
}

func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlocks(ctx, blks...),
bs.Server.NotifyNewBlocks(ctx, blks...),
)
if bs.Server != nil {
return multierr.Combine(
bs.Client.NotifyNewBlocks(ctx, blks...),
bs.Server.NotifyNewBlocks(ctx, blks...),
)
}
return bs.Client.NotifyNewBlocks(ctx, blks...)
}

type Stat struct {
Expand All @@ -115,46 +122,63 @@ func (bs *Bitswap) Stat() (*Stat, error) {
if err != nil {
return nil, err
}
ss, err := bs.Server.Stat()
if err != nil {
return nil, err
}

return &Stat{
// Initialize stat with client stats
stat := &Stat{
Wantlist: cs.Wantlist,
BlocksReceived: cs.BlocksReceived,
DataReceived: cs.DataReceived,
DupBlksReceived: cs.DupBlksReceived,
DupDataReceived: cs.DupDataReceived,
MessagesReceived: cs.MessagesReceived,
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
}, nil
// Server stats will be added conditionally
}

// Stats only available if server is enabled
if bs.Server != nil {
ss, err := bs.Server.Stat()
if err != nil {
return stat, fmt.Errorf("failed to get server stats: %w", err)
}
stat.Peers = ss.Peers
stat.BlocksSent = ss.BlocksSent
stat.DataSent = ss.DataSent
}

return stat, nil
}

func (bs *Bitswap) Close() error {
bs.net.Stop()
bs.Client.Close()
bs.Server.Close()
if bs.Server != nil {
bs.Server.Close()
}
return nil
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
if p == bs.net.Self() {
return bs.Client.GetWantlist()
}
return bs.Server.WantlistForPeer(p)
if bs.Server != nil {
return bs.Server.WantlistForPeer(p)
}
return nil
}

func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.Client.PeerConnected(p)
bs.Server.PeerConnected(p)
if bs.Server != nil {
bs.Server.PeerConnected(p)
}
}

func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.Client.PeerDisconnected(p)
bs.Server.PeerDisconnected(p)
if bs.Server != nil {
bs.Server.PeerDisconnected(p)
}
}

func (bs *Bitswap) ReceiveError(err error) {
Expand All @@ -169,5 +193,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming messa
}

bs.Client.ReceiveMessage(ctx, p, incoming)
bs.Server.ReceiveMessage(ctx, p, incoming)
if bs.Server != nil {
bs.Server.ReceiveMessage(ctx, p, incoming)
}
}
119 changes: 119 additions & 0 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
tu "github.com/libp2p/go-libp2p-testing/etc"
p2ptestutil "github.com/libp2p/go-libp2p-testing/netutil"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

const blockSize = 4
Expand Down Expand Up @@ -832,3 +833,121 @@ func TestWithScoreLedger(t *testing.T) {
t.Fatal("Expected the score ledger to be closed within 5s")
}
}

// TestWithServerDisabled tests that BitSwap can function properly with the server disabled.
// In this mode, it should still be able to request and receive blocks from other peers,
// but should not respond to requests from others.
func TestWithServerDisabled(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
router := mockrouting.NewServer()

// Create instance generator with default options
igWithServer := testinstance.NewTestInstanceGenerator(net, router, nil, nil)
defer igWithServer.Close()

// Create instance generator with server disabled
igNoServer := testinstance.NewTestInstanceGenerator(net, router, nil, []bitswap.Option{
bitswap.WithServerEnabled(false),
})
defer igNoServer.Close()

// Create two peers:
// - peerWithServer: has server enabled and has the block
// - peerNoServer: has server disabled
peerWithServer := igWithServer.Next()
peerNoServer := igNoServer.Next()

// We need to connect the peers so they can communicate
connectPeers(t, net, peerWithServer, peerNoServer)

// Add block to peerWithServer
addBlock(t, context.Background(), peerWithServer, block)

// Test 1: peerNoServer should be able to get a block from peerWithServer
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
receivedBlock, err := peerNoServer.Exchange.GetBlock(ctx, block.Cid())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.RawData(), receivedBlock.RawData()) {
t.Fatal("Data doesn't match")
}

// Test 2: Now try the opposite - peerWithServer should NOT be able to get a block from peerNoServer
// because peerNoServer has server disabled and won't respond to requests
newBlock := blocks.NewBlock([]byte("newblock"))
addBlock(t, context.Background(), peerNoServer, newBlock)

ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel2()
_, err = peerWithServer.Exchange.GetBlock(ctx2, newBlock.Cid())

// We expect this to fail with deadline exceeded because peerNoServer won't respond
if err != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded error, got: %v", err)
}

// Test 3: Verify that Stat() returns correct values even with server disabled
noServerStat, err := peerNoServer.Exchange.Stat()
if err != nil {
t.Fatal(err)
}

// With server disabled, we should still have valid client stats
if noServerStat.BlocksReceived != 1 {
t.Errorf("Expected BlocksReceived to be 1, got %d", noServerStat.BlocksReceived)
}
if noServerStat.DataReceived != uint64(len(block.RawData())) {
t.Errorf("Expected DataReceived to be %d, got %d", len(block.RawData()), noServerStat.DataReceived)
}

// With server disabled, server stats should be zero
if noServerStat.BlocksSent != 0 {
t.Errorf("Expected BlocksSent to be 0, got %d", noServerStat.BlocksSent)
}
if noServerStat.DataSent != 0 {
t.Errorf("Expected DataSent to be 0, got %d", noServerStat.DataSent)
}
}

// Helper function to connect two peers
func connectPeers(t *testing.T, net tn.Network, a, b testinstance.Instance) {
t.Helper()
err := a.Adapter.Connect(context.Background(), peer.AddrInfo{
ID: b.Identity.ID(),
Addrs: []multiaddr.Multiaddr{b.Identity.Address()},
})
if err != nil {
t.Fatal(err)
}
}

// TestServerDisabledNotifyNewBlocks tests that NotifyNewBlocks works correctly
// when the server is disabled.
func TestServerDisabledNotifyNewBlocks(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
router := mockrouting.NewServer()

// Create instance with server disabled
ig := testinstance.NewTestInstanceGenerator(net, router, nil, []bitswap.Option{
bitswap.WithServerEnabled(false),
})
defer ig.Close()

instance := ig.Next()
block := blocks.NewBlock([]byte("test block"))

// Add block to blockstore
err := instance.Blockstore.Put(context.Background(), block)
if err != nil {
t.Fatal(err)
}

// Notify about new block - this should not cause errors even with server disabled
err = instance.Exchange.NotifyNewBlocks(context.Background(), block)
if err != nil {
t.Fatalf("NotifyNewBlocks failed with server disabled: %s", err)
}
}
8 changes: 8 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ func WithTracer(tap tracer.Tracer) Option {
}
}

func WithServerEnabled(enabled bool) Option {
return Option{
option(func(bs *Bitswap) {
bs.serverEnabled = enabled
}),
}
}

func WithClientOption(opt client.Option) Option {
return Option{opt}
}
Expand Down