diff --git a/dht_test.go b/dht_test.go index 9527d380..0080ee9a 100644 --- a/dht_test.go +++ b/dht_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p-kad-dht/internal/net" "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/libp2p/go-libp2p/core/crypto" @@ -35,7 +36,6 @@ import ( test "github.com/libp2p/go-libp2p-kad-dht/internal/testing" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - u "github.com/ipfs/boxo/util" "github.com/ipfs/go-cid" detectrace "github.com/ipfs/go-detect-race" kb "github.com/libp2p/go-libp2p-kbucket" @@ -53,10 +53,10 @@ func init() { var newCid cid.Cid switch i % 3 { case 0: - mhv := u.Hash([]byte(v)) + mhv := internal.Hash([]byte(v)) newCid = cid.NewCidV0(mhv) case 1: - mhv := u.Hash([]byte(v)) + mhv := internal.Hash([]byte(v)) newCid = cid.NewCidV1(cid.DagCBOR, mhv) case 2: rawMh := make([]byte, 12) @@ -857,7 +857,7 @@ func TestRefresh(t *testing.T) { time.Sleep(time.Microsecond * 50) } - if u.Debug { + if testing.Verbose() { // the routing tables should be full now. let's inspect them. printRoutingTables(dhts) } @@ -1002,7 +1002,7 @@ func TestPeriodicRefresh(t *testing.T) { } } - if u.Debug { + if testing.Verbose() { printRoutingTables(dhts) } @@ -1021,7 +1021,7 @@ func TestPeriodicRefresh(t *testing.T) { // until the routing tables look better, or some long timeout for the failure case. waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second) - if u.Debug { + if testing.Verbose() { printRoutingTables(dhts) } } @@ -1056,7 +1056,7 @@ func TestProvidesMany(t *testing.T) { defer cancel() bootstrap(t, ctxT, dhts) - if u.Debug { + if testing.Verbose() { // the routing tables should be full now. let's inspect them. t.Logf("checking routing table of %d", nDHTs) for _, dht := range dhts { diff --git a/dual/dual_test.go b/dual/dual_test.go index 721f605e..a51738ac 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -5,9 +5,9 @@ import ( "testing" "time" - u "github.com/ipfs/boxo/util" "github.com/ipfs/go-cid" dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/internal" test "github.com/libp2p/go-libp2p-kad-dht/internal/testing" record "github.com/libp2p/go-libp2p-record" "github.com/libp2p/go-libp2p/core/host" @@ -22,8 +22,8 @@ import ( var wancid, lancid cid.Cid func init() { - wancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("wan cid -- value"))) - lancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("lan cid -- value"))) + wancid = cid.NewCidV1(cid.DagCBOR, internal.Hash([]byte("wan cid -- value"))) + lancid = cid.NewCidV1(cid.DagCBOR, internal.Hash([]byte("lan cid -- value"))) } type blankValidator struct{} diff --git a/fullrt/dht.go b/fullrt/dht.go index 295358a1..1bae1158 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -26,7 +26,6 @@ import ( swarm "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/gogo/protobuf/proto" - u "github.com/ipfs/boxo/util" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" @@ -53,8 +52,10 @@ import ( var logger = logging.Logger("fullrtdht") -const tracer = tracing.Tracer("go-libp2p-kad-dht/fullrt") -const dhtName = "FullRT" +const ( + tracer = tracing.Tracer("go-libp2p-kad-dht/fullrt") + dhtName = "FullRT" +) const rtRefreshLimitsMsg = `Accelerated DHT client was unable to fully refresh its routing table due to Resource Manager limits, which may degrade content routing. Consider increasing resource limits. See debug logs for the "dht-crawler" subsystem for details.` @@ -530,7 +531,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts } rec := record.MakePutRecord(key, value) - rec.TimeReceived = u.FormatRFC3339(time.Now()) + rec.TimeReceived = internal.FormatRFC3339(time.Now()) err = dht.putLocal(ctx, key, rec) if err != nil { return err @@ -656,7 +657,8 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing. } func (dht *FullRT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, - out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { + out chan<- []byte, nvals int, +) ([]byte, map[peer.ID]struct{}, bool) { numResponses := 0 return dht.processValues(ctx, key, valCh, func(ctx context.Context, v RecvdVal, better bool) bool { @@ -678,7 +680,8 @@ func (dht *FullRT) searchValueQuorum(ctx context.Context, key string, valCh <-ch } func (dht *FullRT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, - newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { + newVal func(ctx context.Context, v RecvdVal, better bool) bool, +) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { loop: for { if aborted { diff --git a/handlers.go b/handlers.go index c2d49672..00187be6 100644 --- a/handlers.go +++ b/handlers.go @@ -11,7 +11,6 @@ import ( pstore "github.com/libp2p/go-libp2p/p2p/host/peerstore" "github.com/gogo/protobuf/proto" - u "github.com/ipfs/boxo/util" ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-kad-dht/internal" pb "github.com/libp2p/go-libp2p-kad-dht/pb" @@ -115,7 +114,7 @@ func (dht *IpfsDHT) checkLocalDatastore(ctx context.Context, k []byte) (*recpb.R } var recordIsBad bool - recvtime, err := u.ParseRFC3339(rec.GetTimeReceived()) + recvtime, err := internal.ParseRFC3339(rec.GetTimeReceived()) if err != nil { logger.Info("either no receive time set on record, or it was invalid: ", err) recordIsBad = true @@ -206,7 +205,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess } // record the time we receive every record - rec.TimeReceived = u.FormatRFC3339(time.Now()) + rec.TimeReceived = internal.FormatRFC3339(time.Now()) data, err := proto.Marshal(rec) if err != nil { diff --git a/internal/util.go b/internal/util.go new file mode 100644 index 00000000..9162fa17 --- /dev/null +++ b/internal/util.go @@ -0,0 +1,35 @@ +package internal + +import ( + "time" + + mh "github.com/multiformats/go-multihash" +) + +// Hash is the global IPFS hash function. uses multihash SHA2_256, 256 bits +func Hash(data []byte) mh.Multihash { + h, err := mh.Sum(data, mh.SHA2_256, -1) + if err != nil { + // this error can be safely ignored (panic) because multihash only fails + // from the selection of hash function. If the fn + length are valid, it + // won't error. + panic("multihash failed to hash using SHA2_256.") + } + return h +} + +// ParseRFC3339 parses an RFC3339Nano-formatted time stamp and +// returns the UTC time. +func ParseRFC3339(s string) (time.Time, error) { + t, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + return time.Time{}, err + } + return t.UTC(), nil +} + +// FormatRFC3339 returns the string representation of the +// UTC value of the given time in RFC3339Nano format. +func FormatRFC3339(t time.Time) string { + return t.UTC().Format(time.RFC3339Nano) +} diff --git a/providers/providers_manager_test.go b/providers/providers_manager_test.go index e830929e..8b52e722 100644 --- a/providers/providers_manager_test.go +++ b/providers/providers_manager_test.go @@ -8,12 +8,12 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" mh "github.com/multiformats/go-multihash" - u "github.com/ipfs/boxo/util" ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" dssync "github.com/ipfs/go-datastore/sync" @@ -35,7 +35,7 @@ func TestProviderManager(t *testing.T) { if err != nil { t.Fatal(err) } - a := u.Hash([]byte("test")) + a := internal.Hash([]byte("test")) p.AddProvider(ctx, a, peer.AddrInfo{ID: peer.ID("testingprovider")}) // Not cached @@ -86,7 +86,7 @@ func TestProvidersDatastore(t *testing.T) { friend := peer.ID("friend") var mhs []mh.Multihash for i := 0; i < 100; i++ { - h := u.Hash([]byte(fmt.Sprint(i))) + h := internal.Hash([]byte(fmt.Sprint(i))) mhs = append(mhs, h) p.AddProvider(ctx, h, peer.AddrInfo{ID: friend}) } @@ -105,7 +105,7 @@ func TestProvidersDatastore(t *testing.T) { func TestProvidersSerialization(t *testing.T) { dstore := dssync.MutexWrap(ds.NewMapDatastore()) - k := u.Hash(([]byte("my key!"))) + k := internal.Hash(([]byte("my key!"))) p1 := peer.ID("peer one") p2 := peer.ID("peer two") pt1 := time.Now() @@ -174,7 +174,7 @@ func TestProvidesExpire(t *testing.T) { peers := []peer.ID{"a", "b"} var mhs []mh.Multihash for i := 0; i < 10; i++ { - h := u.Hash([]byte(fmt.Sprint(i))) + h := internal.Hash([]byte(fmt.Sprint(i))) mhs = append(mhs, h) } @@ -235,8 +235,10 @@ func TestProvidesExpire(t *testing.T) { } } -var _ = io.NopCloser -var _ = os.DevNull +var ( + _ = io.NopCloser + _ = os.DevNull +) // TestLargeProvidersSet can be used for profiling. // The datastore can be switched to levelDB by uncommenting the section below and the import above @@ -286,7 +288,7 @@ func TestLargeProvidersSet(t *testing.T) { var mhs []mh.Multihash for i := 0; i < 1000; i++ { - h := u.Hash([]byte(fmt.Sprint(i))) + h := internal.Hash([]byte(fmt.Sprint(i))) mhs = append(mhs, h) for _, pid := range peers { p.AddProvider(ctx, h, peer.AddrInfo{ID: pid}) @@ -311,8 +313,8 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { defer cancel() p1, p2 := peer.ID("a"), peer.ID("b") - h1 := u.Hash([]byte("1")) - h2 := u.Hash([]byte("2")) + h1 := internal.Hash([]byte("1")) + h2 := internal.Hash([]byte("2")) ps, err := pstoremem.NewPeerstore() if err != nil { t.Fatal(err) @@ -341,7 +343,7 @@ func TestWriteUpdatesCache(t *testing.T) { defer cancel() p1, p2 := peer.ID("a"), peer.ID("b") - h1 := u.Hash([]byte("1")) + h1 := internal.Hash([]byte("1")) ps, err := pstoremem.NewPeerstore() if err != nil { t.Fatal(err) diff --git a/records_test.go b/records_test.go index b6b44c6e..69603b0b 100644 --- a/records_test.go +++ b/records_test.go @@ -7,9 +7,9 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p/core/test" - u "github.com/ipfs/boxo/util" "github.com/ipfs/go-test/random" record "github.com/libp2p/go-libp2p-record" tnet "github.com/libp2p/go-libp2p-testing/net" @@ -205,7 +205,7 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) { // Store incorrect public key on node B rec := record.MakePutRecord(pkkey, wrongbytes) - rec.TimeReceived = u.FormatRFC3339(time.Now()) + rec.TimeReceived = internal.FormatRFC3339(time.Now()) err = dhtB.putLocal(ctx, pkkey, rec) if err != nil { t.Fatal(err) @@ -244,7 +244,7 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) { // Store incorrect public key on node B rec := record.MakePutRecord(pkkey, wrongbytes) - rec.TimeReceived = u.FormatRFC3339(time.Now()) + rec.TimeReceived = internal.FormatRFC3339(time.Now()) err = dhtB.putLocal(ctx, pkkey, rec) if err != nil { t.Fatal(err) @@ -317,9 +317,7 @@ func TestValuesDisabled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var ( - optsA, optsB []Option - ) + var optsA, optsB []Option optsA = append(optsA, ProtocolPrefix("/valuesMaybeDisabled")) optsB = append(optsB, ProtocolPrefix("/valuesMaybeDisabled")) diff --git a/routing.go b/routing.go index 1c8280fc..2753d708 100644 --- a/routing.go +++ b/routing.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - u "github.com/ipfs/boxo/util" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-kad-dht/internal" internalConfig "github.com/libp2p/go-libp2p-kad-dht/internal/config" @@ -65,7 +64,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts } rec := record.MakePutRecord(key, value) - rec.TimeReceived = u.FormatRFC3339(time.Now()) + rec.TimeReceived = internal.FormatRFC3339(time.Now()) err = dht.putLocal(ctx, key, rec) if err != nil { return err @@ -195,7 +194,8 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing } func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan recvdVal, stopCh chan struct{}, - out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { + out chan<- []byte, nvals int, +) ([]byte, map[peer.ID]struct{}, bool) { numResponses := 0 return dht.processValues(ctx, key, valCh, func(ctx context.Context, v recvdVal, better bool) bool { @@ -217,7 +217,8 @@ func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-c } func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan recvdVal, - newVal func(ctx context.Context, v recvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { + newVal func(ctx context.Context, v recvdVal, better bool) bool, +) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { loop: for { if aborted { @@ -357,7 +358,6 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st } }, ) - if err != nil { return } @@ -568,7 +568,6 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { - // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.SendingQuery, @@ -670,7 +669,6 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, return hasValidConnectedness(dht.host, id) }, ) - if err != nil { return peer.AddrInfo{}, err }