Skip to content

Commit de4dd0b

Browse files
Merge branch 'master' into reprovide-sweep
2 parents 0521984 + 010ef38 commit de4dd0b

File tree

11 files changed

+34
-35
lines changed

11 files changed

+34
-35
lines changed

dht.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
195195
return nil, err
196196
}
197197

198-
dht, err := makeDHT(h, cfg)
198+
dht, err := makeDHT(ctx, h, cfg)
199199
if err != nil {
200200
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
201201
}
@@ -280,7 +280,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
280280
return dht
281281
}
282282

283-
func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
283+
func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
284284
var protocols, serverProtocols []protocol.ID
285285

286286
v1proto := cfg.ProtocolPrefix + kad1
@@ -352,20 +352,20 @@ func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
352352
dht.optProvJobsPool = make(chan struct{}, cfg.OptimisticProvideJobsPoolSize)
353353
}
354354

355+
// create a tagged context derived from the original context
356+
// the DHT context should be done when the process is closed
357+
dht.ctx, dht.cancel = context.WithCancel(dht.newContextWithLocalTags(ctx))
358+
355359
// rt refresh manager
356360
dht.rtRefreshManager, err = makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
357361
if err != nil {
358362
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
359363
}
360364

361-
// create a tagged context derived from the original context
362-
// the DHT context should be done when the process is closed
363-
dht.ctx, dht.cancel = context.WithCancel(dht.newContextWithLocalTags(context.Background()))
364-
365365
if cfg.ProviderStore != nil {
366366
dht.providerStore = cfg.ProviderStore
367367
} else {
368-
dht.providerStore, err = providers.NewProviderManager(h.ID(), dht.peerstore, cfg.Datastore)
368+
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore)
369369
if err != nil {
370370
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
371371
}
@@ -402,6 +402,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
402402
}
403403

404404
r, err := rtrefresh.NewRtRefreshManager(
405+
dht.ctx,
405406
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
406407
keyGenFnc,
407408
queryFnc,

fullrt/dht.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
184184
ctx, cancel := context.WithCancel(context.Background())
185185

186186
self := h.ID()
187-
pm, err := providers.NewProviderManager(self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
187+
pm, err := providers.NewProviderManager(ctx, self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
188188
if err != nil {
189189
cancel()
190190
return nil, err

go.mod

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ module github.com/libp2p/go-libp2p-kad-dht
22

33
go 1.23.8
44

5-
toolchain go1.24.3
6-
75
require (
86
github.com/google/gopacket v1.1.19
97
github.com/google/uuid v1.6.0
108
github.com/hashicorp/golang-lru v1.0.2
11-
github.com/ipfs/boxo v0.29.1
9+
github.com/ipfs/boxo v0.30.0
1210
github.com/ipfs/go-cid v0.5.0
1311
github.com/ipfs/go-datastore v0.8.2
1412
github.com/ipfs/go-detect-race v0.0.1
@@ -77,7 +75,7 @@ require (
7775
github.com/libp2p/go-yamux/v5 v5.0.0 // indirect
7876
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
7977
github.com/mattn/go-isatty v0.0.20 // indirect
80-
github.com/miekg/dns v1.1.65 // indirect
78+
github.com/miekg/dns v1.1.66 // indirect
8179
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
8280
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
8381
github.com/minio/sha256-simd v1.0.1 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
142142
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
143143
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
144144
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
145-
github.com/ipfs/boxo v0.29.1 h1:z61ZT4YDfTHLjXTsu/+3wvJ8aJlExthDSOCpx6Nh8xc=
146-
github.com/ipfs/boxo v0.29.1/go.mod h1:MkDJStXiJS9U99cbAijHdcmwNfVn5DKYBmQCOgjY2NU=
145+
github.com/ipfs/boxo v0.30.0 h1:7afsoxPGGqfoH7Dum/wOTGUB9M5fb8HyKPMlLfBvIEQ=
146+
github.com/ipfs/boxo v0.30.0/go.mod h1:BPqgGGyHB9rZZcPSzah2Dc9C+5Or3U1aQe7EH1H7370=
147147
github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs=
148148
github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM=
149149
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
@@ -259,8 +259,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
259259
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
260260
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
261261
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
262-
github.com/miekg/dns v1.1.65 h1:0+tIPHzUW0GCge7IiK3guGP57VAw7hoPDfApjkMD1Fc=
263-
github.com/miekg/dns v1.1.65/go.mod h1:Dzw9769uoKVaLuODMDZz9M6ynFU6Em65csPuoi8G0ck=
262+
github.com/miekg/dns v1.1.66 h1:FeZXOS3VCVsKnEAd+wBkjMC3D2K+ww66Cq3VnCINuJE=
263+
github.com/miekg/dns v1.1.66/go.mod h1:jGFzBsSNbJw6z1HYut1RKBKHA9PBdxeHrZG8J+gC2WE=
264264
github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c h1:bzE/A84HN25pxAuk9Eej1Kz9OUelF97nAc82bDquQI8=
265265
github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c/go.mod h1:0SQS9kMwD2VsyFEB++InYyBJroV/FRmBgcydeSUcJms=
266266
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b h1:z78hV3sbSMAUoyUMM0I83AUIT6Hu17AWfgjzIbtrYFc=

internal/metrics/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,6 @@ func RecordMessageSendErr(ctx context.Context) {
171171
sentMessageErrors.Add(ctx, 1, attrSetOpt)
172172
}
173173

174-
func RecordNetworkSize(ns int64) {
175-
networkSize.Record(context.Background(), int64(ns))
174+
func RecordNetworkSize(ctx context.Context, ns int64) {
175+
networkSize.Record(ctx, int64(ns))
176176
}

lookup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
4848
}
4949

5050
if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
51-
metrics.RecordNetworkSize(int64(ns))
51+
metrics.RecordNetworkSize(dht.ctx, int64(ns))
5252
}
5353

5454
// Reset the refresh timer for this key's bucket since we've just

lookup_optim.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ func (dht *IpfsDHT) optimisticProvide(outerCtx context.Context, keyMH multihash.
114114
return errors.New("can't lookup empty key")
115115
}
116116

117-
// initialize new context for all putProvider operations.
117+
// use dht.ctx for all putProvider operations.
118118
// We don't want to give the outer context to the put operations as we return early before all
119119
// put operations have finished to avoid the long tail of the latency distribution. If we
120120
// provided the outer context the put operations may be cancelled depending on what happens
121121
// with the context on the user side.
122-
putCtx, putCtxCancel := context.WithTimeout(context.Background(), time.Minute)
122+
putCtx, putCtxCancel := context.WithTimeout(dht.ctx, time.Minute)
123123

124124
es, err := dht.newOptimisticState(putCtx, key)
125125
if err != nil {
@@ -177,7 +177,7 @@ func (dht *IpfsDHT) optimisticProvide(outerCtx context.Context, keyMH multihash.
177177
}
178178

179179
if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
180-
metrics.RecordNetworkSize(int64(ns))
180+
metrics.RecordNetworkSize(dht.ctx, int64(ns))
181181
}
182182

183183
// refresh the cpl for this key as the query was successful

providers/providers_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type getProv struct {
117117
}
118118

119119
// NewProviderManager constructor
120-
func NewProviderManager(local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
120+
func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
121121
pm := new(ProviderManager)
122122
pm.self = local
123123
pm.getprovs = make(chan *getProv)
@@ -133,7 +133,7 @@ func NewProviderManager(local peer.ID, ps peerstore.Peerstore, dstore ds.Batchin
133133
if err := pm.applyOptions(opts...); err != nil {
134134
return nil, err
135135
}
136-
pm.ctx, pm.cancel = context.WithCancel(context.Background())
136+
pm.ctx, pm.cancel = context.WithCancel(ctx)
137137
pm.run()
138138
return pm, nil
139139
}

providers/providers_manager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestProviderManager(t *testing.T) {
3131
if err != nil {
3232
t.Fatal(err)
3333
}
34-
p, err := NewProviderManager(mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
34+
p, err := NewProviderManager(ctx, mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
3535
if err != nil {
3636
t.Fatal(err)
3737
}
@@ -77,7 +77,7 @@ func TestProvidersDatastore(t *testing.T) {
7777
t.Fatal(err)
7878
}
7979

80-
p, err := NewProviderManager(mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
80+
p, err := NewProviderManager(ctx, mid, ps, dssync.MutexWrap(ds.NewMapDatastore()))
8181
if err != nil {
8282
t.Fatal(err)
8383
}
@@ -164,7 +164,7 @@ func TestProvidesExpire(t *testing.T) {
164164
if err != nil {
165165
t.Fatal(err)
166166
}
167-
p, err := NewProviderManager(mid, ps, ds)
167+
p, err := NewProviderManager(ctx, mid, ps, ds)
168168
if err != nil {
169169
t.Fatal(err)
170170
}
@@ -278,7 +278,7 @@ func TestLargeProvidersSet(t *testing.T) {
278278
t.Fatal(err)
279279
}
280280

281-
p, err := NewProviderManager(mid, ps, dstore)
281+
p, err := NewProviderManager(ctx, mid, ps, dstore)
282282
if err != nil {
283283
t.Fatal(err)
284284
}
@@ -318,7 +318,7 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
318318
t.Fatal(err)
319319
}
320320

321-
pm, err := NewProviderManager(p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
321+
pm, err := NewProviderManager(ctx, p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
322322
if err != nil {
323323
t.Fatal(err)
324324
}
@@ -347,7 +347,7 @@ func TestWriteUpdatesCache(t *testing.T) {
347347
t.Fatal(err)
348348
}
349349

350-
pm, err := NewProviderManager(p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
350+
pm, err := NewProviderManager(ctx, p1, ps, dssync.MutexWrap(ds.NewMapDatastore()))
351351
if err != nil {
352352
t.Fatal(err)
353353
}

rtrefresh/rt_refresh_manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ type RtRefreshManager struct {
5757
refreshDoneCh chan struct{} // write to this channel after every refresh
5858
}
5959

60-
func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
60+
func NewRtRefreshManager(ctx context.Context, h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
6161
refreshKeyGenFnc func(cpl uint) (string, error),
6262
refreshQueryFnc func(ctx context.Context, key string) error,
6363
refreshPingFnc func(ctx context.Context, p peer.ID) error,
6464
refreshQueryTimeout time.Duration,
6565
refreshInterval time.Duration,
6666
successfulOutboundQueryGracePeriod time.Duration,
67-
refreshDoneCh chan struct{}) (*RtRefreshManager, error) {
68-
69-
ctx, cancel := context.WithCancel(context.Background())
67+
refreshDoneCh chan struct{},
68+
) (*RtRefreshManager, error) {
69+
ctx, cancel := context.WithCancel(ctx)
7070
return &RtRefreshManager{
7171
ctx: ctx,
7272
cancel: cancel,

version.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
"version": "v0.32.0"
2+
"version": "v0.33.0"
33
}

0 commit comments

Comments
 (0)