Skip to content

Commit b442027

Browse files
authored
Accept missing blocks without failing data transfer (#291)
* feat(channels): add missing cids field * feat(graphsync): handle missings cids via events, integration test * style(lint): mod tidy * feat(channels): put retrievals with missing CIDs in "partially complete" * style(graphsync): add explanatory comment * feat(deps): update to v0.12.0 graphsync * ci(circle): update to go 1.16 * build(go.mod): update to go 1.16
1 parent 3fc3bba commit b442027

22 files changed

+505
-88
lines changed

.circleci/config.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
version: 2.1
22
orbs:
3-
go: gotest/[email protected].9
3+
go: gotest/[email protected].14
44
codecov: codecov/[email protected]
55

66
executors:
77
golang:
88
docker:
9-
- image: circleci/golang:1.15
9+
- image: circleci/golang:1.16
1010
resource_class: small
1111

1212
commands:

channelmonitor/channelmonitor_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -621,3 +621,7 @@ func (m *mockChannelState) ReceivedCidsLen() int {
621621
func (m *mockChannelState) ReceivedCidsTotal() int64 {
622622
panic("implement me")
623623
}
624+
625+
func (m *mockChannelState) MissingCids() []cid.Cid {
626+
panic("implement me")
627+
}

channels/channel_state.go

+8
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ type channelState struct {
5656
// stages tracks the timeline of events related to a data transfer, for
5757
// traceability purposes.
5858
stages *datatransfer.ChannelStages
59+
60+
// missingCids are the set of CIDS that were missing and skipped over in the data transfer
61+
missingCids []cid.Cid
5962
}
6063

6164
// EmptyChannelState is the zero value for channel state, meaning not present
@@ -193,6 +196,10 @@ func (c channelState) OtherPeer() peer.ID {
193196
return c.sender
194197
}
195198

199+
func (c channelState) MissingCids() []cid.Cid {
200+
return c.missingCids
201+
}
202+
196203
// Stages returns the current ChannelStages object, or an empty object.
197204
// It is unsafe for the caller to modify the return value, and changes may not
198205
// be persisted. It should be treated as immutable.
@@ -230,6 +237,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
230237
voucherDecoder: voucherDecoder,
231238
receivedCids: receivedCidsReader,
232239
stages: c.Stages,
240+
missingCids: c.MissingCids,
233241
}
234242
}
235243

channels/channels.go

+5
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,11 @@ func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) erro
373373
return c.send(chid, datatransfer.ReceiveDataError, err)
374374
}
375375

376+
// CIDMissing indicates the sender is missing a section of the graph in the response
377+
func (c *Channels) CIDMissing(chid datatransfer.ChannelID, cid cid.Cid) error {
378+
return c.send(chid, datatransfer.CIDMissing, cid)
379+
}
380+
376381
// HasChannel returns true if the given channel id is being tracked
377382
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
378383
return c.stateMachines.Has(chid)

channels/channels_fsm.go

+29
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package channels
22

33
import (
4+
"github.com/ipfs/go-cid"
45
logging "github.com/ipfs/go-log/v2"
56
cbg "github.com/whyrusleeping/cbor-gen"
67

@@ -103,6 +104,24 @@ var ChannelEvents = fsm.Events{
103104
chst.AddLog("")
104105
return nil
105106
}),
107+
108+
fsm.Event(datatransfer.CIDMissing).FromMany(transferringStates...).ToJustRecord().
109+
Action(func(chst *internal.ChannelState, missing cid.Cid) error {
110+
// TODO: find a more efficient way to do this
111+
var found bool
112+
for _, c := range chst.MissingCids {
113+
if c.Equals(missing) {
114+
found = true
115+
break
116+
}
117+
}
118+
if !found {
119+
chst.MissingCids = append(chst.MissingCids, missing)
120+
}
121+
chst.AddLog("")
122+
return nil
123+
}),
124+
106125
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
107126
chst.Message = err.Error()
108127
chst.AddLog("data transfer disconnected: %s", chst.Message)
@@ -225,6 +244,12 @@ var ChannelEvents = fsm.Events{
225244
return nil
226245
}),
227246

247+
fsm.Event(datatransfer.CleanupCompletePartial).
248+
From(datatransfer.Completing).To(datatransfer.PartiallyCompleted).Action(func(chst *internal.ChannelState) error {
249+
chst.AddLog("")
250+
return nil
251+
}),
252+
228253
// will kickoff state handlers for channels that were cleaning up
229254
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
230255
chst.AddLog("")
@@ -247,6 +272,9 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal
247272
}
248273
env.CleanupChannel(datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder})
249274
env.Unprotect(otherParty, datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}.String())
275+
if channel.Status == datatransfer.Completing && len(channel.MissingCids) > 0 {
276+
return ctx.Trigger(datatransfer.CleanupCompletePartial)
277+
}
250278
return ctx.Trigger(datatransfer.CleanupComplete)
251279
}
252280

@@ -262,6 +290,7 @@ var ChannelFinalityStates = []fsm.StateKey{
262290
datatransfer.Cancelled,
263291
datatransfer.Completed,
264292
datatransfer.Failed,
293+
datatransfer.PartiallyCompleted,
265294
}
266295

267296
// IsChannelTerminated returns true if the channel is in a finality state

channels/channels_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,37 @@ func TestChannels(t *testing.T) {
241241
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
242242
})
243243

244+
t.Run("missing cids", func(t *testing.T) {
245+
ds := dss.MutexWrap(datastore.NewMapDatastore())
246+
dir := os.TempDir()
247+
cidLists, err := cidlists.NewCIDLists(dir)
248+
require.NoError(t, err)
249+
channelList, err := channels.New(ds, cidLists, notifier, decoderByType, decoderByType, &fakeEnv{}, peers[0])
250+
require.NoError(t, err)
251+
err = channelList.Start(ctx)
252+
require.NoError(t, err)
253+
254+
_, err = channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
255+
require.NoError(t, err)
256+
state := checkEvent(ctx, t, received, datatransfer.Open)
257+
require.Equal(t, datatransfer.Requested, state.Status())
258+
259+
err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0])
260+
require.NoError(t, err)
261+
state = checkEvent(ctx, t, received, datatransfer.CIDMissing)
262+
require.Equal(t, []cid.Cid{cids[0]}, state.MissingCids())
263+
264+
err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1])
265+
require.NoError(t, err)
266+
state = checkEvent(ctx, t, received, datatransfer.CIDMissing)
267+
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.MissingCids())
268+
269+
err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0])
270+
require.NoError(t, err)
271+
state = checkEvent(ctx, t, received, datatransfer.CIDMissing)
272+
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.MissingCids())
273+
})
274+
244275
t.Run("pause/resume", func(t *testing.T) {
245276
state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
246277
require.NoError(t, err)

channels/internal/internalchannel.go

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type ChannelState struct {
6868
//
6969
// EXPERIMENTAL; subject to change.
7070
Stages *datatransfer.ChannelStages
71+
72+
// MissingCids are the set of CIDS that were missing and skipped over in the data transfer
73+
MissingCids []cid.Cid
7174
}
7275

7376
// AddLog takes an fmt string with arguments, and adds the formatted string to

channels/internal/internalchannel_cbor_gen.go

+54-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

events.go

+9
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ const (
110110

111111
// Opened is fired when a request for data is sent from this node to a peer
112112
Opened
113+
114+
// CIDMissing is fired when the sender is missing a section of the graph in the response
115+
CIDMissing
116+
117+
// CleanupCompletePartial causes a completing request to transition to a PartiallyCompleted state
118+
// rather than a full Completed state
119+
CleanupCompletePartial
113120
)
114121

115122
// Events are human readable names for data transfer events
@@ -144,6 +151,8 @@ var Events = map[EventCode]string{
144151
ReceiveDataError: "ReceiveDataError",
145152
TransferRequestQueued: "TransferRequestQueued",
146153
RequestCancelled: "RequestCancelled",
154+
CIDMissing: "CIDMissing",
155+
CleanupCompletePartial: "CleanupCompletePartial",
147156
}
148157

149158
// Event is a struct containing information about a data transfer event

go.mod

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/filecoin-project/go-data-transfer
22

3-
go 1.15
3+
go 1.16
44

55
require (
66
github.com/bep/debounce v1.2.0
@@ -10,10 +10,10 @@ require (
1010
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
1111
github.com/ipfs/go-block-format v0.0.3
1212
github.com/ipfs/go-blockservice v0.2.1
13-
github.com/ipfs/go-cid v0.0.7
13+
github.com/ipfs/go-cid v0.1.0
1414
github.com/ipfs/go-datastore v0.5.1
1515
github.com/ipfs/go-ds-badger v0.3.0
16-
github.com/ipfs/go-graphsync v0.11.5
16+
github.com/ipfs/go-graphsync v0.12.0
1717
github.com/ipfs/go-ipfs-blockstore v1.1.2
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5
@@ -24,8 +24,8 @@ require (
2424
github.com/ipfs/go-ipld-format v0.2.0
2525
github.com/ipfs/go-log/v2 v2.3.0
2626
github.com/ipfs/go-merkledag v0.5.1
27-
github.com/ipfs/go-unixfs v0.2.4
28-
github.com/ipld/go-ipld-prime v0.14.3
27+
github.com/ipfs/go-unixfs v0.3.1
28+
github.com/ipld/go-ipld-prime v0.14.4
2929
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
3030
github.com/jpillora/backoff v1.0.0
3131
github.com/libp2p/go-libp2p v0.16.0
@@ -37,7 +37,7 @@ require (
3737
go.opentelemetry.io/otel/sdk v1.2.0
3838
go.opentelemetry.io/otel/trace v1.3.0
3939
go.uber.org/atomic v1.9.0
40-
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
40+
golang.org/x/exp v0.0.0-20210615023648-acb5c1269671
4141
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
4242
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
4343
)

0 commit comments

Comments
 (0)