Skip to content

Commit 1cbdbdf

Browse files
committed
extract messaging components from IpfsDHT into its own struct. create a new struct that manages sending DHT messages that can be used independently from the DHT.
1 parent 9304f55 commit 1cbdbdf

File tree

9 files changed

+216
-168
lines changed

9 files changed

+216
-168
lines changed

dht.go

Lines changed: 3 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package dht
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
@@ -33,7 +32,6 @@ import (
3332
goprocessctx "github.com/jbenet/goprocess/context"
3433
"github.com/multiformats/go-base32"
3534
ma "github.com/multiformats/go-multiaddr"
36-
"github.com/multiformats/go-multihash"
3735
"go.opencensus.io/tag"
3836
"go.uber.org/zap"
3937
)
@@ -97,8 +95,7 @@ type IpfsDHT struct {
9795
ctx context.Context
9896
proc goprocess.Process
9997

100-
strmap map[peer.ID]*messageSender
101-
smlk sync.Mutex
98+
protoMessenger *ProtocolMessenger
10299

103100
plk sync.Mutex
104101

@@ -188,6 +185,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
188185
dht.enableValues = cfg.enableValues
189186

190187
dht.Validator = cfg.validator
188+
dht.protoMessenger = NewProtocolMessenger(dht.host, dht.protocols, dht.Validator)
191189

192190
dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing
193191

@@ -274,7 +272,6 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
274272
selfKey: kb.ConvertPeerID(h.ID()),
275273
peerstore: h.Peerstore(),
276274
host: h,
277-
strmap: make(map[peer.ID]*messageSender),
278275
birth: time.Now(),
279276
protocols: protocols,
280277
protocolsStrs: protocol.ConvertToStrings(protocols),
@@ -507,67 +504,8 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() {
507504
}
508505
}
509506

510-
// putValueToPeer stores the given key/value pair at the peer 'p'
511-
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
512-
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
513-
pmes.Record = rec
514-
rpmes, err := dht.sendRequest(ctx, p, pmes)
515-
if err != nil {
516-
logger.Debugw("failed to put value to peer", "to", p, "key", loggableRecordKeyBytes(rec.Key), "error", err)
517-
return err
518-
}
519-
520-
if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
521-
logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
522-
return errors.New("value not put correctly")
523-
}
524-
525-
return nil
526-
}
527-
528507
var errInvalidRecord = errors.New("received invalid record")
529508

530-
// getValueOrPeers queries a particular peer p for the value for
531-
// key. It returns either the value or a list of closer peers.
532-
// NOTE: It will update the dht's peerstore with any new addresses
533-
// it finds for the given peer.
534-
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
535-
pmes, err := dht.getValueSingle(ctx, p, key)
536-
if err != nil {
537-
return nil, nil, err
538-
}
539-
540-
// Perhaps we were given closer peers
541-
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
542-
543-
if rec := pmes.GetRecord(); rec != nil {
544-
// Success! We were given the value
545-
logger.Debug("got value")
546-
547-
// make sure record is valid.
548-
err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue())
549-
if err != nil {
550-
logger.Debug("received invalid record (discarded)")
551-
// return a sentinal to signify an invalid record was received
552-
err = errInvalidRecord
553-
rec = new(recpb.Record)
554-
}
555-
return rec, peers, err
556-
}
557-
558-
if len(peers) > 0 {
559-
return nil, peers, nil
560-
}
561-
562-
return nil, nil, routing.ErrNotFound
563-
}
564-
565-
// getValueSingle simply performs the get value RPC with the given parameters
566-
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
567-
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
568-
return dht.sendRequest(ctx, p, pmes)
569-
}
570-
571509
// getLocal attempts to retrieve the value from the datastore
572510
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
573511
logger.Debugw("finding value in datastore", "key", loggableRecordKeyString(key))
@@ -696,17 +634,6 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
696634
}
697635
}
698636

699-
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
700-
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
701-
pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
702-
return dht.sendRequest(ctx, p, pmes)
703-
}
704-
705-
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
706-
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
707-
return dht.sendRequest(ctx, p, pmes)
708-
}
709-
710637
// nearestPeersToQuery returns the routing tables closest peers.
711638
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
712639
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
@@ -847,15 +774,7 @@ func (dht *IpfsDHT) Host() host.Host {
847774

848775
// Ping sends a ping message to the passed peer and waits for a response.
849776
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
850-
req := pb.NewMessage(pb.Message_PING, nil, 0)
851-
resp, err := dht.sendRequest(ctx, p, req)
852-
if err != nil {
853-
return fmt.Errorf("sending request: %w", err)
854-
}
855-
if resp.Type != pb.Message_PING {
856-
return fmt.Errorf("got unexpected response type: %v", resp.Type)
857-
}
858-
return nil
777+
return dht.protoMessenger.Ping(ctx, p)
859778
}
860779

861780
// newContextWithLocalTags returns a new context.Context with the InstanceID and

dht_net.go

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
"time"
1010

1111
"github.com/libp2p/go-libp2p-core/helpers"
12+
"github.com/libp2p/go-libp2p-core/host"
1213
"github.com/libp2p/go-libp2p-core/network"
1314
"github.com/libp2p/go-libp2p-core/peer"
15+
"github.com/libp2p/go-libp2p-core/protocol"
1416
"github.com/libp2p/go-msgio/protoio"
1517

1618
"github.com/libp2p/go-libp2p-kad-dht/metrics"
@@ -207,12 +209,38 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
207209
}
208210
}
209211

212+
type messageManager struct {
213+
host host.Host // the network services we need
214+
strmap map[peer.ID]*messageSender
215+
smlk sync.Mutex
216+
protocols []protocol.ID
217+
}
218+
219+
func (m *messageManager) streamDisconnect(ctx context.Context, p peer.ID) {
220+
m.smlk.Lock()
221+
defer m.smlk.Unlock()
222+
ms, ok := m.strmap[p]
223+
if !ok {
224+
return
225+
}
226+
delete(m.strmap, p)
227+
228+
// Do this asynchronously as ms.lk can block for a while.
229+
go func() {
230+
if err := ms.lk.Lock(ctx); err != nil {
231+
return
232+
}
233+
defer ms.lk.Unlock()
234+
ms.invalidate()
235+
}()
236+
}
237+
210238
// sendRequest sends out a request, but also makes sure to
211239
// measure the RTT for latency measurements.
212-
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
240+
func (m *messageManager) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
213241
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))
214242

215-
ms, err := dht.messageSenderForPeer(ctx, p)
243+
ms, err := m.messageSenderForPeer(ctx, p)
216244
if err != nil {
217245
stats.Record(ctx,
218246
metrics.SentRequests.M(1),
@@ -239,15 +267,15 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
239267
metrics.SentBytes.M(int64(pmes.Size())),
240268
metrics.OutboundRequestLatency.M(float64(time.Since(start))/float64(time.Millisecond)),
241269
)
242-
dht.peerstore.RecordLatency(p, time.Since(start))
270+
m.host.Peerstore().RecordLatency(p, time.Since(start))
243271
return rpmes, nil
244272
}
245273

246274
// sendMessage sends out a message
247-
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
275+
func (m *messageManager) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
248276
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))
249277

250-
ms, err := dht.messageSenderForPeer(ctx, p)
278+
ms, err := m.messageSenderForPeer(ctx, p)
251279
if err != nil {
252280
stats.Record(ctx,
253281
metrics.SentMessages.M(1),
@@ -273,30 +301,30 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
273301
return nil
274302
}
275303

276-
func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
277-
dht.smlk.Lock()
278-
ms, ok := dht.strmap[p]
304+
func (m *messageManager) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
305+
m.smlk.Lock()
306+
ms, ok := m.strmap[p]
279307
if ok {
280-
dht.smlk.Unlock()
308+
m.smlk.Unlock()
281309
return ms, nil
282310
}
283-
ms = &messageSender{p: p, dht: dht, lk: newCtxMutex()}
284-
dht.strmap[p] = ms
285-
dht.smlk.Unlock()
311+
ms = &messageSender{p: p, m: m, lk: newCtxMutex()}
312+
m.strmap[p] = ms
313+
m.smlk.Unlock()
286314

287315
if err := ms.prepOrInvalidate(ctx); err != nil {
288-
dht.smlk.Lock()
289-
defer dht.smlk.Unlock()
316+
m.smlk.Lock()
317+
defer m.smlk.Unlock()
290318

291-
if msCur, ok := dht.strmap[p]; ok {
319+
if msCur, ok := m.strmap[p]; ok {
292320
// Changed. Use the new one, old one is invalid and
293321
// not in the map so we can just throw it away.
294322
if ms != msCur {
295323
return msCur, nil
296324
}
297325
// Not changed, remove the now invalid stream from the
298326
// map.
299-
delete(dht.strmap, p)
327+
delete(m.strmap, p)
300328
}
301329
// Invalid but not in map. Must have been removed by a disconnect.
302330
return nil, err
@@ -306,11 +334,11 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
306334
}
307335

308336
type messageSender struct {
309-
s network.Stream
310-
r msgio.ReadCloser
311-
lk ctxMutex
312-
p peer.ID
313-
dht *IpfsDHT
337+
s network.Stream
338+
r msgio.ReadCloser
339+
lk ctxMutex
340+
p peer.ID
341+
m *messageManager
314342

315343
invalid bool
316344
singleMes int
@@ -351,7 +379,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
351379
// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
352380
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
353381
// backwards compatibility reasons).
354-
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
382+
nstr, err := ms.m.host.NewStream(ctx, ms.p, ms.m.protocols...)
355383
if err != nil {
356384
return err
357385
}

dht_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -570,14 +570,14 @@ func TestInvalidMessageSenderTracking(t *testing.T) {
570570
defer dht.Close()
571571

572572
foo := peer.ID("asdasd")
573-
_, err := dht.messageSenderForPeer(ctx, foo)
573+
_, err := dht.protoMessenger.m.messageSenderForPeer(ctx, foo)
574574
if err == nil {
575575
t.Fatal("that shouldnt have succeeded")
576576
}
577577

578-
dht.smlk.Lock()
579-
mscnt := len(dht.strmap)
580-
dht.smlk.Unlock()
578+
dht.protoMessenger.m.smlk.Lock()
579+
mscnt := len(dht.protoMessenger.m.strmap)
580+
dht.protoMessenger.m.smlk.Unlock()
581581

582582
if mscnt > 0 {
583583
t.Fatal("should have no message senders in map")

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
5757
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
5858
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
5959
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
60+
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
6061
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
6162
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
6263
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
@@ -66,6 +67,7 @@ github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ
6667
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
6768
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
6869
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
70+
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
6971
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
7072
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
7173
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
@@ -91,6 +93,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
9193
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
9294
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
9395
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
96+
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
9497
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
9598
github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
9699
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
@@ -467,6 +470,7 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI
467470
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
468471
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
469472
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
473+
github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU=
470474
github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
471475
github.com/onsi/ginkgo v1.12.1 h1:mFwc4LvZ0xpSvDZ3E+k8Yte0hLOMxXUlP+yXtJqkYfQ=
472476
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
@@ -626,6 +630,7 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
626630
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425 h1:VvQyQJN0tSuecqgcIxMWnnfG5kSmgy9KZR9sW3W5QeA=
627631
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
628632
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
633+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
629634
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
630635
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
631636
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -650,6 +655,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
650655
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
651656
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
652657
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
658+
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
653659
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
654660
gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8=
655661
gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE=

lookup.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/libp2p/go-libp2p-core/peer"
99
"github.com/libp2p/go-libp2p-core/routing"
1010

11-
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
1211
kb "github.com/libp2p/go-libp2p-kbucket"
1312
)
1413

@@ -30,12 +29,11 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
3029
ID: p,
3130
})
3231

33-
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
32+
peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, peer.ID(key))
3433
if err != nil {
3534
logger.Debugf("error getting closer peers: %s", err)
3635
return nil, err
3736
}
38-
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
3937

4038
// For DHT query command
4139
routing.PublishQueryEvent(ctx, &routing.QueryEvent{

0 commit comments

Comments
 (0)