@@ -16,7 +16,7 @@ use ethrex_storage::{error::StoreError, Store, STATE_TRIE_SEGMENTS};
16
16
use ethrex_trie:: { Nibbles , Node , TrieError , TrieState } ;
17
17
use state_healing:: heal_state_trie;
18
18
use state_sync:: state_sync;
19
- use std:: { array, sync:: Arc } ;
19
+ use std:: { array, collections :: HashMap , sync:: Arc } ;
20
20
use storage_healing:: storage_healer;
21
21
use tokio:: {
22
22
sync:: {
@@ -26,7 +26,7 @@ use tokio::{
26
26
time:: { Duration , Instant } ,
27
27
} ;
28
28
use tokio_util:: sync:: CancellationToken ;
29
- use tracing:: { debug, info, warn} ;
29
+ use tracing:: { debug, error , info, warn} ;
30
30
use trie_rebuild:: TrieRebuilder ;
31
31
32
32
use crate :: {
@@ -78,6 +78,14 @@ pub struct SyncManager {
78
78
/// Syncing beyond this pivot should re-enable snap-sync (as we will not have that state stored)
79
79
/// TODO: Reorgs
80
80
last_snap_pivot : u64 ,
81
+ /// The `forkchoice_update` and `new_payload` methods require the `latest_valid_hash`
82
+ /// when processing an invalid payload. To provide this, we must track invalid chains.
83
+ ///
84
+ /// We only store the last known valid head upon encountering a bad block,
85
+ /// rather than tracking every subsequent invalid block.
86
+ ///
87
+ /// This map stores the bad block hash with and latest valid block hash of the chain corresponding to the bad block
88
+ pub invalid_ancestors : HashMap < BlockHash , BlockHash > ,
81
89
trie_rebuilder : Option < TrieRebuilder > ,
82
90
// Used for cancelling long-living tasks upon shutdown
83
91
cancel_token : CancellationToken ,
@@ -93,6 +101,7 @@ impl SyncManager {
93
101
sync_mode,
94
102
peers : PeerHandler :: new ( peer_table) ,
95
103
last_snap_pivot : 0 ,
104
+ invalid_ancestors : HashMap :: new ( ) ,
96
105
trie_rebuilder : None ,
97
106
cancel_token,
98
107
}
@@ -106,6 +115,7 @@ impl SyncManager {
106
115
sync_mode : SyncMode :: Full ,
107
116
peers : PeerHandler :: new ( dummy_peer_table) ,
108
117
last_snap_pivot : 0 ,
118
+ invalid_ancestors : HashMap :: new ( ) ,
109
119
trie_rebuilder : None ,
110
120
// This won't be used
111
121
cancel_token : CancellationToken :: new ( ) ,
@@ -155,6 +165,12 @@ impl SyncManager {
155
165
current_head = last_header;
156
166
}
157
167
}
168
+
169
+ let pending_block = match store. get_pending_block ( sync_head) {
170
+ Ok ( res) => res,
171
+ Err ( e) => return Err ( e. into ( ) ) ,
172
+ } ;
173
+
158
174
loop {
159
175
debug ! ( "Requesting Block Headers from {current_head}" ) ;
160
176
// Request Block Headers from Peer
@@ -167,12 +183,23 @@ impl SyncManager {
167
183
debug ! (
168
184
"Received {} block headers| Last Number: {}" ,
169
185
block_headers. len( ) ,
170
- block_headers. last( ) . as_ref( ) . unwrap( ) . number
186
+ block_headers. last( ) . as_ref( ) . unwrap( ) . number,
171
187
) ;
172
188
let mut block_hashes = block_headers
173
189
. iter ( )
174
190
. map ( |header| header. compute_block_hash ( ) )
175
191
. collect :: < Vec < _ > > ( ) ;
192
+ let last_header = block_headers. last ( ) . unwrap ( ) . clone ( ) ;
193
+
194
+ // If we have a pending block from new_payload request
195
+ // attach it to the end if it matches the parent_hash of the latest received header
196
+ if let Some ( ref block) = pending_block {
197
+ if block. header . parent_hash == last_header. compute_block_hash ( ) {
198
+ block_hashes. push ( block. hash ( ) ) ;
199
+ block_headers. push ( block. header . clone ( ) ) ;
200
+ }
201
+ }
202
+
176
203
// Check if we already found the sync head
177
204
let sync_head_found = block_hashes. contains ( & sync_head) ;
178
205
// Update current fetch head if needed
@@ -185,9 +212,8 @@ impl SyncManager {
185
212
store. set_header_download_checkpoint ( current_head) ?;
186
213
} else {
187
214
// If the sync head is less than 64 blocks away from our current head switch to full-sync
188
- let last_header_number = block_headers. last ( ) . unwrap ( ) . number ;
189
215
let latest_block_number = store. get_latest_block_number ( ) ?;
190
- if last_header_number . saturating_sub ( latest_block_number)
216
+ if last_header . number . saturating_sub ( latest_block_number)
191
217
< MIN_FULL_BLOCKS as u64
192
218
{
193
219
// Too few blocks for a snap sync, switching to full sync
@@ -262,7 +288,13 @@ impl SyncManager {
262
288
}
263
289
SyncMode :: Full => {
264
290
// full-sync: Fetch all block bodies and execute them sequentially to build the state
265
- download_and_run_blocks ( all_block_hashes, self . peers . clone ( ) , store. clone ( ) ) . await ?
291
+ download_and_run_blocks (
292
+ all_block_hashes,
293
+ self . peers . clone ( ) ,
294
+ store. clone ( ) ,
295
+ & mut self . invalid_ancestors ,
296
+ )
297
+ . await ?
266
298
}
267
299
}
268
300
Ok ( ( ) )
@@ -275,7 +307,9 @@ async fn download_and_run_blocks(
275
307
mut block_hashes : Vec < BlockHash > ,
276
308
peers : PeerHandler ,
277
309
store : Store ,
310
+ invalid_ancestors : & mut HashMap < BlockHash , BlockHash > ,
278
311
) -> Result < ( ) , SyncError > {
312
+ let mut last_valid_hash = H256 :: default ( ) ;
279
313
loop {
280
314
debug ! ( "Requesting Block Bodies " ) ;
281
315
if let Some ( block_bodies) = peers. request_block_bodies ( block_hashes. clone ( ) ) . await {
@@ -292,11 +326,12 @@ async fn download_and_run_blocks(
292
326
let number = header. number ;
293
327
let block = Block :: new ( header, body) ;
294
328
if let Err ( error) = ethrex_blockchain:: add_block ( & block, & store) {
295
- warn ! ( "Failed to add block during FullSync: {error}" ) ;
329
+ invalid_ancestors . insert ( hash , last_valid_hash ) ;
296
330
return Err ( error. into ( ) ) ;
297
331
}
298
332
store. set_canonical_block ( number, hash) ?;
299
333
store. update_latest_block_number ( number) ?;
334
+ last_valid_hash = hash;
300
335
}
301
336
info ! ( "Executed & stored {} blocks" , block_bodies_len) ;
302
337
// Check if we need to ask for another batch
0 commit comments