Skip to content

Commit b63e3c3

Browse files
core: improve snapshot journal recovery (#21594)
* core/state/snapshot: introduce snapshot journal version * core: update the disk layer in an atomic way * core: persist the disk layer generator periodically * core/state/snapshot: improve logging * core/state/snapshot: forcibly ensure the legacy snapshot is matched * core/state/snapshot: add debug logs * core, tests: fix tests and special recovery case * core: polish * core: add more blockchain tests for snapshot recovery * core/state: fix comment * core: add recovery flag for snapshot * core: add restart after start-after-crash tests * core/rawdb: fix imports * core: fix tests * core: remove log * core/state/snapshot: fix snapshot * core: avoid callbacks in SetHead * core: fix setHead cornercase where the threshold root has state * core: small docs for the test cases Co-authored-by: Péter Szilágyi <[email protected]>
1 parent 43c278c commit b63e3c3

11 files changed

+1792
-159
lines changed

core/blockchain.go

+79-13
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,10 @@ type BlockChain struct {
207207
processor Processor // Block transaction processor interface
208208
vmConfig vm.Config
209209

210-
badBlocks *lru.Cache // Bad block cache
211-
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
212-
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
210+
badBlocks *lru.Cache // Bad block cache
211+
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
212+
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
213+
writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format.
213214
}
214215

215216
// NewBlockChain returns a fully initialised block chain using information
@@ -281,9 +282,29 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
281282
// Make sure the state associated with the block is available
282283
head := bc.CurrentBlock()
283284
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
284-
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
285-
if err := bc.SetHead(head.NumberU64()); err != nil {
286-
return nil, err
285+
// Head state is missing, before the state recovery, find out the
286+
// disk layer point of snapshot(if it's enabled). Make sure the
287+
// rewound point is lower than disk layer.
288+
var diskRoot common.Hash
289+
if bc.cacheConfig.SnapshotLimit > 0 {
290+
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
291+
}
292+
if diskRoot != (common.Hash{}) {
293+
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash(), "snaproot", diskRoot)
294+
295+
snapDisk, err := bc.SetHeadBeyondRoot(head.NumberU64(), diskRoot)
296+
if err != nil {
297+
return nil, err
298+
}
299+
// Chain rewound, persist old snapshot number to indicate recovery procedure
300+
if snapDisk != 0 {
301+
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
302+
}
303+
} else {
304+
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
305+
if err := bc.SetHead(head.NumberU64()); err != nil {
306+
return nil, err
307+
}
287308
}
288309
}
289310
// Ensure that a previous crash in SetHead doesn't leave extra ancients
@@ -339,7 +360,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
339360
}
340361
// Load any existing snapshot, regenerating it if loading failed
341362
if bc.cacheConfig.SnapshotLimit > 0 {
342-
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
363+
// If the chain was rewound past the snapshot persistent layer (causing
364+
// a recovery block number to be persisted to disk), check if we're still
365+
// in recovery mode and in that case, don't invalidate the snapshot on a
366+
// head mismatch.
367+
var recover bool
368+
369+
head := bc.CurrentBlock()
370+
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer > head.NumberU64() {
371+
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
372+
recover = true
373+
}
374+
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, recover)
343375
}
344376
// Take ownership of this particular state
345377
go bc.update()
@@ -444,9 +476,25 @@ func (bc *BlockChain) loadLastState() error {
444476
// was fast synced or full synced and in which state, the method will try to
445477
// delete minimal data from disk whilst retaining chain consistency.
446478
func (bc *BlockChain) SetHead(head uint64) error {
479+
_, err := bc.SetHeadBeyondRoot(head, common.Hash{})
480+
return err
481+
}
482+
483+
// SetHeadBeyondRoot rewinds the local chain to a new head with the extra condition
484+
// that the rewind must pass the specified state root. This method is meant to be
485+
// used when rewiding with snapshots enabled to ensure that we go back further than
486+
// persistent disk layer. Depending on whether the node was fast synced or full, and
487+
// in which state, the method will try to delete minimal data from disk whilst
488+
// retaining chain consistency.
489+
//
490+
// The method returns the block number where the requested root cap was found.
491+
func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) {
447492
bc.chainmu.Lock()
448493
defer bc.chainmu.Unlock()
449494

495+
// Track the block number of the requested root hash
496+
var rootNumber uint64 // (no root == always 0)
497+
450498
// Retrieve the last pivot block to short circuit rollbacks beyond it and the
451499
// current freezer limit to start nuking id underflown
452500
pivot := rawdb.ReadLastPivotNumber(bc.db)
@@ -462,8 +510,16 @@ func (bc *BlockChain) SetHead(head uint64) error {
462510
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
463511
newHeadBlock = bc.genesisBlock
464512
} else {
465-
// Block exists, keep rewinding until we find one with state
513+
// Block exists, keep rewinding until we find one with state,
514+
// keeping rewinding until we exceed the optional threshold
515+
// root hash
516+
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)
517+
466518
for {
519+
// If a root threshold was requested but not yet crossed, check
520+
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
521+
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
522+
}
467523
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
468524
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
469525
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
@@ -474,8 +530,12 @@ func (bc *BlockChain) SetHead(head uint64) error {
474530
newHeadBlock = bc.genesisBlock
475531
}
476532
}
477-
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
478-
break
533+
if beyondRoot || newHeadBlock.NumberU64() == 0 {
534+
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
535+
break
536+
}
537+
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
538+
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1) // Keep rewinding
479539
}
480540
}
481541
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
@@ -555,7 +615,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
555615
bc.txLookupCache.Purge()
556616
bc.futureBlocks.Purge()
557617

558-
return bc.loadLastState()
618+
return rootNumber, bc.loadLastState()
559619
}
560620

561621
// FastSyncCommitHead sets the current head block to the one defined by the hash
@@ -940,8 +1000,14 @@ func (bc *BlockChain) Stop() {
9401000
var snapBase common.Hash
9411001
if bc.snaps != nil {
9421002
var err error
943-
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
944-
log.Error("Failed to journal state snapshot", "err", err)
1003+
if bc.writeLegacyJournal {
1004+
if snapBase, err = bc.snaps.LegacyJournal(bc.CurrentBlock().Root()); err != nil {
1005+
log.Error("Failed to journal state snapshot", "err", err)
1006+
}
1007+
} else {
1008+
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
1009+
log.Error("Failed to journal state snapshot", "err", err)
1010+
}
9451011
}
9461012
}
9471013
// Ensure the state of a recent block is also stored to disk before exiting.

0 commit comments

Comments
 (0)