Skip to content

Commit e24e96a

Browse files
rvagghannahhoward
authored andcommitted
Merge commits from main to v0.10.x release branch
feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests (#284) * feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests * fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests fix(responsemanager): fix flaky tests fix(responsemanager): make fix more global feat: add basic OT tracing for incoming requests Closes: #271 docs(tests): document tracing test helper utilities fix(test): increase 1s timeouts to 2s for slow CI (#289) * fix(test): increase 1s timeouts to 2s for slow CI * fixup! fix(test): increase 1s timeouts to 2s for slow CI testutil/chaintypes: simplify maintenance of codegen (#294) "go generate" now updates the generated code for us. The separate directory for a main package was unnecessary; a build-tag-ignored file is enough. Using gofmt on the resulting source is now unnecessary too, as upstream has been using go/format on its output for some time. Finally, re-generate the output source code, as the last time that was done we were on an older ipld-prime. ipldutil: use chooser APIs from dagpb and basicnode (#292) Saves us a bit of extra code, since they were added in summer. Also avoid making defaultVisitor a variable, which makes it clearer that it's never a nil func. While here, replace node/basic with node/basicnode, as the former has been deprecated in favor of the latter. Co-authored-by: Hannah Howard <[email protected]> fix: use sync.Cond to handle no-task blocking wait (#299) Ref: #284 Peer Stats function (#298) * feat(graphsync): add impl method for peer stats add method that gets current request states by request ID for a given peer * fix(requestmanager): fix tested method Add a bit of logging (#301) * chore(responsemanager): add a bit of logging * fix(responsemanager): remove code change chore: short-circuit unnecessary message processing Expose task queue diagnostics (#302) * feat(impl): expose task queue diagnostics * refactor(peerstate): put peerstate in its own module * refactor(peerstate): make diagnostics return array
1 parent 2925810 commit e24e96a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1417
-408
lines changed

benchmarks/benchmark_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/ipfs/go-unixfs/importer/balanced"
2727
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
2828
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
29-
basicnode "github.com/ipld/go-ipld-prime/node/basic"
29+
"github.com/ipld/go-ipld-prime/node/basicnode"
3030
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
3131
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
3232
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

cidset/cidset.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/ipfs/go-cid"
77
"github.com/ipld/go-ipld-prime/fluent"
88
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
9-
basicnode "github.com/ipld/go-ipld-prime/node/basic"
9+
"github.com/ipld/go-ipld-prime/node/basicnode"
1010

1111
"github.com/ipfs/go-graphsync/ipldutil"
1212
)

dedupkey/dedupkey.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dedupkey
22

33
import (
4-
basicnode "github.com/ipld/go-ipld-prime/node/basic"
4+
"github.com/ipld/go-ipld-prime/node/basicnode"
55

66
"github.com/ipfs/go-graphsync/ipldutil"
77
)

donotsendfirstblocks/donotsendfirstblocks.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package donotsendfirstblocks
22

33
import (
4-
basicnode "github.com/ipld/go-ipld-prime/node/basic"
4+
"github.com/ipld/go-ipld-prime/node/basicnode"
55

66
"github.com/ipfs/go-graphsync/ipldutil"
77
)

go.mod

+6-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ require (
2222
github.com/ipfs/go-ipld-format v0.2.0
2323
github.com/ipfs/go-log/v2 v2.1.1
2424
github.com/ipfs/go-merkledag v0.3.2
25-
github.com/ipfs/go-peertaskqueue v0.6.0
25+
github.com/ipfs/go-peertaskqueue v0.7.1
2626
github.com/ipfs/go-unixfs v0.2.4
2727
github.com/ipld/go-codec-dagpb v1.3.0
2828
github.com/ipld/go-ipld-prime v0.12.3
@@ -35,9 +35,12 @@ require (
3535
github.com/libp2p/go-msgio v0.0.6
3636
github.com/multiformats/go-multiaddr v0.3.1
3737
github.com/multiformats/go-multihash v0.0.15
38-
github.com/stretchr/testify v1.6.1
38+
github.com/stretchr/testify v1.7.0
3939
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
40-
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
40+
go.opentelemetry.io/otel v1.2.0
41+
go.opentelemetry.io/otel/sdk v1.2.0
42+
go.opentelemetry.io/otel/trace v1.2.0
43+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
4144
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
4245
google.golang.org/protobuf v1.27.1
4346
)

go.sum

+16-7
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
9999
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
100100
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
101101
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
102-
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
103102
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
103+
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
104+
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
104105
github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbBY=
105106
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
106107
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
@@ -228,8 +229,8 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
228229
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
229230
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
230231
github.com/ipfs/go-peertaskqueue v0.1.1/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
231-
github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg=
232-
github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
232+
github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE=
233+
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
233234
github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo=
234235
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
235236
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
@@ -616,8 +617,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
616617
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
617618
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
618619
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
619-
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
620620
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
621+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
622+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
621623
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
622624
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
623625
github.com/urfave/cli/v2 v2.0.0 h1:+HU9SCbu8GnEUFtIBfuUNXN39ofWViIEJIp6SURMpCg=
@@ -650,6 +652,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
650652
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
651653
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
652654
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
655+
go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ=
656+
go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I=
657+
go.opentelemetry.io/otel/sdk v1.2.0 h1:wKN260u4DesJYhyjxDa7LRFkuhH7ncEVKU37LWcyNIo=
658+
go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uadoSafgHPx1U=
659+
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
660+
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
653661
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
654662
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
655663
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@@ -708,8 +716,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
708716
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
709717
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
710718
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
711-
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
712-
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
719+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
720+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
713721
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
714722
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
715723
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -732,8 +740,9 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
732740
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
733741
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
734742
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
735-
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
736743
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
744+
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
745+
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
737746
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
738747
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
739748
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=

graphsync.go

+28
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,34 @@ type Stats struct {
332332
OutgoingResponses ResponseStats
333333
}
334334

335+
// RequestState describes the current general state of a request
336+
type RequestState uint64
337+
338+
// RequestStates describe a set of request IDs and their current state
339+
type RequestStates map[RequestID]RequestState
340+
341+
const (
342+
// Queued means a request has been received and is queued for processing
343+
Queued RequestState = iota
344+
// Running means a request is actively sending or receiving data
345+
Running
346+
// Paused means a request is paused
347+
Paused
348+
)
349+
350+
func (rs RequestState) String() string {
351+
switch rs {
352+
case Queued:
353+
return "queued"
354+
case Running:
355+
return "running"
356+
case Paused:
357+
return "paused"
358+
default:
359+
return "unrecognized request state"
360+
}
361+
}
362+
335363
// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
336364
type GraphExchange interface {
337365
// Request initiates a new GraphSync request to the given peer using the given selector spec.

impl/graphsync.go

+38-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"github.com/ipfs/go-peertaskqueue"
99
ipld "github.com/ipld/go-ipld-prime"
1010
"github.com/libp2p/go-libp2p-core/peer"
11+
"go.opentelemetry.io/otel"
12+
"go.opentelemetry.io/otel/attribute"
13+
"go.opentelemetry.io/otel/trace"
1114

1215
"github.com/ipfs/go-graphsync"
1316
"github.com/ipfs/go-graphsync/allocator"
@@ -16,6 +19,7 @@ import (
1619
"github.com/ipfs/go-graphsync/messagequeue"
1720
gsnet "github.com/ipfs/go-graphsync/network"
1821
"github.com/ipfs/go-graphsync/peermanager"
22+
"github.com/ipfs/go-graphsync/peerstate"
1923
"github.com/ipfs/go-graphsync/requestmanager"
2024
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
2125
"github.com/ipfs/go-graphsync/requestmanager/executor"
@@ -304,6 +308,15 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
304308

305309
// Request initiates a new GraphSync request to the given peer using the given selector spec.
306310
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
311+
var extNames []string
312+
for _, ext := range extensions {
313+
extNames = append(extNames, string(ext.Name))
314+
}
315+
ctx, _ = otel.Tracer("graphsync").Start(ctx, "request", trace.WithAttributes(
316+
attribute.String("peerID", p.Pretty()),
317+
attribute.String("root", root.String()),
318+
attribute.StringSlice("extensions", extNames),
319+
))
307320
return gs.requestManager.NewRequest(ctx, p, root, selector, extensions...)
308321
}
309322

@@ -446,6 +459,20 @@ func (gs *GraphSync) Stats() graphsync.Stats {
446459
}
447460
}
448461

462+
// PeerState describes the state of graphsync for a given peer
463+
type PeerState struct {
464+
OutgoingState peerstate.PeerState
465+
IncomingState peerstate.PeerState
466+
}
467+
468+
// PeerState produces insight on the current state of a given peer
469+
func (gs *GraphSync) PeerState(p peer.ID) PeerState {
470+
return PeerState{
471+
OutgoingState: gs.requestManager.PeerState(p),
472+
IncomingState: gs.responseManager.PeerState(p),
473+
}
474+
}
475+
449476
type graphSyncReceiver GraphSync
450477

451478
func (gsr *graphSyncReceiver) graphSync() *GraphSync {
@@ -458,8 +485,17 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
458485
ctx context.Context,
459486
sender peer.ID,
460487
incoming gsmsg.GraphSyncMessage) {
461-
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
462-
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
488+
489+
requests := incoming.Requests()
490+
responses := incoming.Responses()
491+
blocks := incoming.Blocks()
492+
493+
if len(requests) > 0 {
494+
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, requests)
495+
}
496+
if len(responses) > 0 || len(blocks) > 0 {
497+
gsr.graphSync().requestManager.ProcessResponses(sender, responses, blocks)
498+
}
463499
}
464500

465501
// ReceiveError is part of the network's Receiver interface and handles incoming

0 commit comments

Comments
 (0)