Skip to content

Expose task queue diagnostics #302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipfs/go-merkledag v0.5.1
github.com/ipfs/go-peertaskqueue v0.7.0
github.com/ipfs/go-peertaskqueue v0.7.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.3
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,9 @@ github.com/ipfs/go-merkledag v0.5.1/go.mod h1:cLMZXx8J08idkp5+id62iVftUQV+HlYJ3P
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0=
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE=
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo=
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
Expand Down
13 changes: 13 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ const (
Paused
)

func (rs RequestState) String() string {
switch rs {
case Queued:
return "queued"
case Running:
return "running"
case Paused:
return "paused"
default:
return "unrecognized request state"
}
}

// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Expand Down
21 changes: 10 additions & 11 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-graphsync/messagequeue"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/requestmanager/executor"
Expand Down Expand Up @@ -458,19 +459,17 @@ func (gs *GraphSync) Stats() graphsync.Stats {
}
}

// PeerStats describes the state of graphsync for a given
type PeerStats struct {
// OutgoingRequests
OutgoingRequests graphsync.RequestStates
// IncomingRequests
IncomingRequests graphsync.RequestStates
// PeerState describes the state of graphsync for a given peer
type PeerState struct {
OutgoingState peerstate.PeerState
IncomingState peerstate.PeerState
}

// PeerStats produces insight on the current state of a given peer
func (gs *GraphSync) PeerStats(p peer.ID) PeerStats {
return PeerStats{
OutgoingRequests: gs.requestManager.PeerStats(p),
IncomingRequests: gs.responseManager.PeerStats(p),
// PeerState produces insight on the current state of a given peer
func (gs *GraphSync) PeerState(p peer.ID) PeerState {
return PeerState{
OutgoingState: gs.requestManager.PeerState(p),
IncomingState: gs.responseManager.PeerState(p),
}
}

Expand Down
25 changes: 18 additions & 7 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,24 @@ func TestPauseResume(t *testing.T) {
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID())
require.Len(t, requestorPeerStats.OutgoingRequests, 1)
require.Len(t, requestorPeerStats.IncomingRequests, 0)

responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID())
require.Len(t, responderPeerStats.IncomingRequests, 1)
require.Len(t, responderPeerStats.OutgoingRequests, 0)
requestorPeerState := requestor.(*GraphSync).PeerState(td.host2.ID())
require.Len(t, requestorPeerState.OutgoingState.RequestStates, 1)
require.Len(t, requestorPeerState.IncomingState.RequestStates, 0)
require.Len(t, requestorPeerState.OutgoingState.Active, 1)
require.Contains(t, requestorPeerState.OutgoingState.RequestStates, requestorPeerState.OutgoingState.Active[0])
require.Len(t, requestorPeerState.OutgoingState.Pending, 0)
require.Len(t, requestorPeerState.IncomingState.Active, 0)
require.Len(t, requestorPeerState.IncomingState.Pending, 0)
require.Len(t, requestorPeerState.OutgoingState.Diagnostics(), 0)
responderPeerState := responder.(*GraphSync).PeerState(td.host1.ID())
require.Len(t, responderPeerState.IncomingState.RequestStates, 1)
require.Len(t, responderPeerState.OutgoingState.RequestStates, 0)
// no tasks as response is paused by responder
require.Len(t, responderPeerState.IncomingState.Active, 0)
require.Len(t, responderPeerState.IncomingState.Pending, 0)
require.Len(t, responderPeerState.OutgoingState.Active, 0)
require.Len(t, responderPeerState.OutgoingState.Pending, 0)
require.Len(t, responderPeerState.IncomingState.Diagnostics(), 0)

requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
Expand Down
63 changes: 63 additions & 0 deletions peerstate/peerstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package peerstate

import (
"fmt"

"github.com/ipfs/go-graphsync"
)

// TaskQueueState describes the the set of requests for a given peer in a task queue
type TaskQueueState struct {
Active []graphsync.RequestID
Pending []graphsync.RequestID
}

// PeerState tracks the over all state of a given peer for either
// incoming or outgoing requests
type PeerState struct {
graphsync.RequestStates
TaskQueueState
}

// Diagnostics compares request states with the current state of the task queue to identify unexpected
// states or inconsistences between the tracked task queue and the tracked requests
func (ps PeerState) Diagnostics() map[graphsync.RequestID][]string {
matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates))
matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates))
diagnostics := make(map[graphsync.RequestID][]string)
for _, id := range ps.TaskQueueState.Active {
status, ok := ps.RequestStates[id]
if ok {
matchedActiveQueue[id] = struct{}{}
if status != graphsync.Running {
diagnostics[id] = append(diagnostics[id], fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status))
}
} else {
diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id))
}
}
for _, id := range ps.TaskQueueState.Pending {
status, ok := ps.RequestStates[id]
if ok {
matchedPendingQueue[id] = struct{}{}
if status != graphsync.Queued {
diagnostics[id] = append(diagnostics[id], fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status))
}
} else {
diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id))
}
}
for id, state := range ps.RequestStates {
if state == graphsync.Running {
if _, ok := matchedActiveQueue[id]; !ok {
diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in running state is not in the active task queue", id))
}
}
if state == graphsync.Queued {
if _, ok := matchedPendingQueue[id]; !ok {
diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id))
}
}
}
return diagnostics
}
139 changes: 139 additions & 0 deletions peerstate/peerstate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package peerstate_test

import (
"fmt"
"math/rand"
"testing"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/stretchr/testify/require"
)

func TestDiagnostics(t *testing.T) {
requestIDs := make([]graphsync.RequestID, 0, 5)
for i := 0; i < 5; i++ {
requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31()))
}
testCases := map[string]struct {
requestStates graphsync.RequestStates
queueStats peerstate.TaskQueueState
expectedDiagnostics map[graphsync.RequestID][]string
}{
"all requests and queue match": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{},
},
"active task with with incorrect state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Queued,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{
requestIDs[1]: {fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[1])},
requestIDs[4]: {fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4])},
},
},
"active task with no state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{
requestIDs[1]: {fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1])},
},
},
"pending task with with incorrect state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Running,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{
requestIDs[3]: {fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[3])},
requestIDs[4]: {fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4])},
},
},
"pending task with no state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{
requestIDs[3]: {fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3])},
},
},
"request state running with no active task": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{
requestIDs[1]: {fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1])},
},
},
"request state queued with no pending task": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: peerstate.TaskQueueState{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2]},
},
expectedDiagnostics: map[graphsync.RequestID][]string{
requestIDs[3]: {fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3])},
},
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
require.Equal(t, data.expectedDiagnostics, peerstate.PeerState{data.requestStates, data.queueStats}.Diagnostics())
})
}
}
15 changes: 8 additions & 7 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/requestmanager/types"
Expand Down Expand Up @@ -332,15 +333,15 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err
}
}

// PeerStats gets stats on all outgoing requests for a given peer
func (rm *RequestManager) PeerStats(p peer.ID) graphsync.RequestStates {
response := make(chan graphsync.RequestStates)
rm.send(&peerStatsMessage{p, response}, nil)
// PeerState gets stats on all outgoing requests for a given peer
func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
rm.send(&peerStateMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return nil
case peerStats := <-response:
return peerStats
return peerstate.PeerState{}
case peerState := <-response:
return peerState
}
}

Expand Down
7 changes: 4 additions & 3 deletions requestmanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-graphsync"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/requestmanager/executor"
)

Expand Down Expand Up @@ -109,12 +110,12 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) {
}
}

type peerStatsMessage struct {
type peerStateMessage struct {
p peer.ID
peerStatsChan chan<- graphsync.RequestStates
peerStatsChan chan<- peerstate.PeerState
}

func (psm *peerStatsMessage) handle(rm *RequestManager) {
func (psm *peerStateMessage) handle(rm *RequestManager) {
peerStats := rm.peerStats(psm.p)
select {
case psm.peerStatsChan <- peerStats:
Expand Down
12 changes: 8 additions & 4 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,10 +994,14 @@ func TestStats(t *testing.T) {

requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3)

states := td.requestManager.PeerStats(peers[0])
require.Len(t, states, 2)
require.Equal(t, states[requestRecords[0].gsr.ID()], graphsync.Running)
require.Equal(t, states[requestRecords[1].gsr.ID()], graphsync.Running)
peerState := td.requestManager.PeerState(peers[0])
require.Len(t, peerState.RequestStates, 2)
require.Equal(t, peerState.RequestStates[requestRecords[0].gsr.ID()], graphsync.Running)
require.Equal(t, peerState.RequestStates[requestRecords[1].gsr.ID()], graphsync.Running)
require.Len(t, peerState.Active, 2)
require.Contains(t, peerState.Active, requestRecords[0].gsr.ID())
require.Contains(t, peerState.Active, requestRecords[1].gsr.ID())
require.Len(t, peerState.Pending, 0)
}

type requestRecord struct {
Expand Down
Loading