Skip to content

eth/downloader: fix unexpected skeleton header deletion #26451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (s *skeleton) initSync(head *types.Header) {
}
break
}
// If the last subchain can be extended, we're lucky. Otherwise create
// If the last subchain can be extended, we're lucky. Otherwise, create
// a new subchain sync task.
var extended bool
if n := len(s.progress.Subchains); n > 0 {
Expand Down Expand Up @@ -977,8 +977,14 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// the expected new sync cycle after some propagated blocks. Log
// it for debugging purposes, explicitly clean and don't escalate.
case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
// Remove the leftover skeleton header associated with old
// skeleton chain only if it's not covered by the current
// skeleton range.
if s.progress.Subchains[1].Head < s.progress.Subchains[0].Tail {
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
}
// Drop the leftover skeleton chain since it's stale.
s.progress.Subchains = s.progress.Subchains[:1]

// If we have more than one header or more than one leftover chain,
Expand Down
157 changes: 119 additions & 38 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
type hookedBackfiller struct {
// suspendHook is an optional hook to be called when the filler is requested
// to be suspended.
suspendHook func()
suspendHook func() *types.Header

// resumeHook is an optional hook to be called when the filler is requested
// to be resumed.
Expand All @@ -56,7 +56,7 @@ func newHookedBackfiller() backfiller {
// on initial startup.
func (hf *hookedBackfiller) suspend() *types.Header {
if hf.suspendHook != nil {
hf.suspendHook()
return hf.suspendHook()
}
return nil // we don't really care about header cleanups for now
}
Expand Down Expand Up @@ -525,9 +525,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
Number: big.NewInt(int64(i)),
})
}
// Some tests require a forking side chain to trigger cornercases.
var sidechain []*types.Header
for i := 0; i < len(chain)/2; i++ { // Fork at block #5000
sidechain = append(sidechain, chain[i])
}
for i := len(chain) / 2; i < len(chain); i++ {
sidechain = append(sidechain, &types.Header{
ParentHash: sidechain[i-1].Hash(),
Number: big.NewInt(int64(i)),
Extra: []byte("B"), // force a different hash
})
}
tests := []struct {
headers []*types.Header // Database content (beside the genesis)
oldstate []*subchain // Old sync state with various interrupted subchains
fill bool // Whether to run a real backfiller in this test case
unpredictable bool // Whether to ignore drops/serves due to uncertain packet assignments

head *types.Header // New head header to announce to reorg to
peers []*skeletonTestPeer // Initial peer set to start the sync with
Expand Down Expand Up @@ -760,11 +772,41 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}},
endserve: 4 * requestHeaders,
},
// This test reproduces a bug caught by (@rjl493456442) where a skeleton
// header goes missing, causing the sync to get stuck and/or panic.
//
// The setup requires a previously successfully synced chain up to a block
// height N. That results is a single skeleton header (block N) and a single
// subchain (head N, Tail N) being stored on disk.
//
// The following step requires a new sync cycle to a new side chain of a
// height higher than N, and an ancestor lower than N (e.g. N-2, N+2).
// In this scenario, when processing a batch of headers, a link point of
// N-2 will be found, meaning that N-1 and N have been overwritten.
//
// The link event triggers an early exit, noticing that the previous sub-
// chain is a leftover and deletes it (with it's skeleton header N). But
// since skeleton header N has been overwritten to the new side chain, we
// end up losing it and creating a gap.
{
fill: true,
unpredictable: true, // We have good and bad peer too, bad may be dropped, test too short for certainty

head: chain[len(chain)/2+1], // Sync up until the sidechain common ancestor + 2
peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-oldchain", chain)},
midstate: []*subchain{{Head: uint64(len(chain)/2 + 1), Tail: 1}},

newHead: sidechain[len(sidechain)/2+3], // Sync up until the sidechain common ancestor + 4
newPeer: newSkeletonTestPeer("test-peer-newchain", sidechain),
endstate: []*subchain{{Head: uint64(len(sidechain)/2 + 3), Tail: uint64(len(chain) / 2)}},
},
}
for i, tt := range tests {
// Create a fresh database and initialize it with the starting state
db := rawdb.NewMemoryDatabase()
rawdb.WriteHeader(db, chain[0])

rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0]))
rawdb.WriteReceipts(db, chain[0].Hash(), chain[0].Number.Uint64(), types.Receipts{})

// Create a peer set to feed headers through
peerset := newPeerSet()
Expand All @@ -780,8 +822,43 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
peerset.Unregister(peer)
dropped[peer]++
}
// Create a backfiller if we need to run more advanced tests
filler := newHookedBackfiller()
if tt.fill {
var filled *types.Header

filler = &hookedBackfiller{
resumeHook: func() {
var progress skeletonProgress
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)

for progress.Subchains[0].Tail < progress.Subchains[0].Head {
header := rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail)

rawdb.WriteBlock(db, types.NewBlockWithHeader(header))
rawdb.WriteReceipts(db, header.Hash(), header.Number.Uint64(), types.Receipts{})

rawdb.DeleteSkeletonHeader(db, header.Number.Uint64())

progress.Subchains[0].Tail++
progress.Subchains[0].Next = header.Hash()
}
filled = rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail)

rawdb.WriteBlock(db, types.NewBlockWithHeader(filled))
rawdb.WriteReceipts(db, filled.Hash(), filled.Number.Uint64(), types.Receipts{})
},

suspendHook: func() *types.Header {
prev := filled
filled = nil

return prev
},
}
}
// Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller())
skeleton := newSkeleton(db, peerset, drop, filler)
skeleton.Sync(tt.head, true)

var progress skeletonProgress
Expand Down Expand Up @@ -815,19 +892,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
t.Error(err)
continue
}
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
if !tt.unpredictable {
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
}
}
// Apply the post-init events if there's any
if tt.newHead != nil {
Expand Down Expand Up @@ -868,25 +947,27 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
continue
}
// Check that the peers served no more headers than we actually needed
served = 0
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops = 0
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
}
if drops != tt.middrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
if !tt.unpredictable {
served := uint64(0)
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops := uint64(0)
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
}
if drops != tt.enddrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
}
}
// Clean up any leftover skeleton sync resources
skeleton.Terminate()
Expand Down