18
18
import struct
19
19
from collections import OrderedDict
20
20
from enum import Enum
21
- from typing import TYPE_CHECKING , Any , Callable , Generator , Optional , cast
21
+ from typing import TYPE_CHECKING , Any , Callable , Generator , NamedTuple , Optional , cast
22
22
23
23
from structlog import get_logger
24
24
from twisted .internet .defer import Deferred , inlineCallbacks
44
44
MAX_GET_TRANSACTIONS_BFS_LEN : int = 8
45
45
46
46
47
+ class BlockInfo (NamedTuple ):
48
+ height : int
49
+ id : VertexId
50
+
51
+ def __repr__ (self ):
52
+ return f'BlockInfo({ self .height } , { self .id .hex ()} )'
53
+
54
+ def to_json (self ) -> dict [str , Any ]:
55
+ return {
56
+ 'height' : self .height ,
57
+ 'id' : self .id .hex (),
58
+ }
59
+
60
+
47
61
class PeerState (Enum ):
48
62
ERROR = 'error'
49
63
UNKNOWN = 'unknown'
@@ -92,16 +106,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
92
106
self .receiving_stream = False
93
107
94
108
# highest block where we are synced
95
- self .synced_height = 0
109
+ self .synced_block : Optional [ BlockInfo ] = None
96
110
97
111
# highest block peer has
98
- self .peer_height = 0
112
+ self .peer_best_block : Optional [ BlockInfo ] = None
99
113
100
114
# Latest deferred waiting for a reply.
101
115
self ._deferred_txs : dict [VertexId , Deferred [BaseTransaction ]] = {}
102
116
self ._deferred_tips : Optional [Deferred [list [bytes ]]] = None
103
- self ._deferred_best_block : Optional [Deferred [dict [ str , Any ] ]] = None
104
- self ._deferred_peer_block_hashes : Optional [Deferred [list [tuple [ int , bytes ] ]]] = None
117
+ self ._deferred_best_block : Optional [Deferred [BlockInfo ]] = None
118
+ self ._deferred_peer_block_hashes : Optional [Deferred [list [BlockInfo ]]] = None
105
119
106
120
# When syncing blocks we start streaming with all peers
107
121
# so the moment I get some repeated blocks, I stop the download
@@ -151,8 +165,8 @@ def get_status(self) -> dict[str, Any]:
151
165
"""
152
166
res = {
153
167
'is_enabled' : self .is_sync_enabled (),
154
- 'peer_height ' : self .peer_height ,
155
- 'synced_height ' : self .synced_height ,
168
+ 'peer_best_block ' : self .peer_best_block . to_json () if self . peer_best_block else None ,
169
+ 'synced_block ' : self .synced_block . to_json () if self . synced_block else None ,
156
170
'synced' : self ._synced ,
157
171
'state' : self .state .value ,
158
172
}
@@ -332,37 +346,43 @@ def run_sync_transactions(self) -> None:
332
346
end_block_height = block_height )
333
347
self .send_get_transactions_bfs (needed_txs , block .hash )
334
348
349
+ def get_my_best_block (self ) -> BlockInfo :
350
+ """Return my best block info."""
351
+ bestblock = self .tx_storage .get_best_block ()
352
+ assert bestblock .hash is not None
353
+ meta = bestblock .get_metadata ()
354
+ # assert not meta.voided_by
355
+ assert meta .validation .is_fully_connected ()
356
+ return BlockInfo (height = bestblock .get_height (), id = bestblock .hash )
357
+
335
358
@inlineCallbacks
336
359
def run_sync_blocks (self ) -> Generator [Any , Any , None ]:
337
360
""" Async step of the block syncing phase.
338
361
"""
339
362
assert self .tx_storage .indexes is not None
340
363
self .state = PeerState .SYNCING_BLOCKS
341
364
342
- # Find my height
343
- bestblock = self .tx_storage .get_best_block ()
344
- assert bestblock .hash is not None
345
- meta = bestblock .get_metadata ()
346
- my_height = meta .height
365
+ # Get my best block.
366
+ my_best_block = self .get_my_best_block ()
347
367
348
- self .log .debug ('run sync blocks' , my_height = my_height )
368
+ # Find peer's best block
369
+ self .peer_best_block = yield self .get_peer_best_block ()
370
+ assert self .peer_best_block is not None
349
371
350
- # Find best block
351
- data = yield self .get_peer_best_block ()
352
- peer_best_block = data ['block' ]
353
- peer_best_height = data ['height' ]
354
- self .peer_height = peer_best_height
372
+ self .log .debug ('run_sync_blocks' , my_best_block = my_best_block , peer_best_block = self .peer_best_block )
355
373
356
374
# find best common block
357
- yield self .find_best_common_block (peer_best_height , peer_best_block )
358
- self .log .debug ('run_sync_blocks' , peer_height = self .peer_height , synced_height = self .synced_height )
375
+ self .synced_block = yield self .find_best_common_block (my_best_block , self .peer_best_block )
376
+ assert self .synced_block is not None
377
+ self .log .debug ('run_sync_blocks' , peer_best_block = self .peer_best_block , synced_block = self .synced_block )
359
378
360
- if self .synced_height < self .peer_height :
379
+ if self .synced_block . height < self .peer_best_block . height :
361
380
# sync from common block
362
- peer_block_at_height = yield self .get_peer_block_hashes ([self .synced_height ])
363
- if peer_block_at_height :
364
- self .run_block_sync (peer_block_at_height [0 ][1 ], self .synced_height , peer_best_block , peer_best_height )
365
- elif my_height == self .synced_height == self .peer_height :
381
+ self .run_block_sync (self .synced_block .id ,
382
+ self .synced_block .height ,
383
+ self .peer_best_block .id ,
384
+ self .peer_best_block .height )
385
+ elif my_best_block .height == self .synced_block .height == self .peer_best_block .height :
366
386
# we're synced and on the same height, get their mempool
367
387
self .state = PeerState .SYNCING_MEMPOOL
368
388
self .mempool_manager .run ()
@@ -494,68 +514,67 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
494
514
return self .tx_storage .transaction_exists (vertex_id )
495
515
496
516
@inlineCallbacks
497
- def find_best_common_block (self , peer_best_height : int , peer_best_block : bytes ) -> Generator [Any , Any , None ]:
517
+ def find_best_common_block (self ,
518
+ my_best_block : BlockInfo ,
519
+ peer_best_block : BlockInfo ) -> Generator [Any , Any , BlockInfo ]:
498
520
""" Search for the highest block/height where we're synced.
499
521
"""
500
- assert self .tx_storage .indexes is not None
501
- my_best_height = self .tx_storage .get_height_best_block ()
502
-
503
- self .log .debug ('find common chain' , peer_height = peer_best_height , my_height = my_best_height )
522
+ self .log .debug ('find_best_common_block' , peer_best_block = peer_best_block , my_best_block = my_best_block )
504
523
505
- if peer_best_height <= my_best_height :
506
- my_block = self .tx_storage .indexes .height .get (peer_best_height )
507
- if my_block == peer_best_block :
524
+ if peer_best_block . height <= my_best_block . height :
525
+ common_block_hash = self .tx_storage .indexes .height .get (peer_best_block . height )
526
+ if peer_best_block . id == common_block_hash :
508
527
# we have all the peer's blocks
509
- if peer_best_height == my_best_height :
528
+ if peer_best_block . height == my_best_block . height :
510
529
# We are in sync, ask for relay so the remote sends transactions in real time
511
530
self .update_synced (True )
512
531
self .send_relay ()
513
532
else :
514
533
self .update_synced (False )
515
-
516
- self .log .debug ('synced to the latest peer block' , height = peer_best_height )
517
- self .synced_height = peer_best_height
518
- return
534
+ self .log .debug ('synced to the latest peer block' , peer_best_block = peer_best_block )
535
+ return BlockInfo (height = peer_best_block .height , id = common_block_hash )
519
536
else :
520
- # TODO peer is on a different best chain
521
- self .log .warn ('peer on different chain' , peer_height = peer_best_height ,
522
- peer_block = peer_best_block . hex (), my_block = ( my_block . hex () if my_block is not None else
523
- None ) )
537
+ # peer is on a different best chain
538
+ self .log .warn ('peer on different chain' ,
539
+ peer_best_block = peer_best_block ,
540
+ my_best_block = my_best_block )
524
541
525
542
self .update_synced (False )
526
- not_synced = min (peer_best_height , my_best_height )
527
- synced = self .synced_height
528
-
529
- while not_synced - synced > 1 :
530
- self .log .debug ('find_best_common_block synced not_synced' , synced = synced , not_synced = not_synced )
531
- step = math .ceil ((not_synced - synced )/ 10 )
532
- heights = []
533
- height = synced
534
- while height < not_synced :
535
- heights .append (height )
536
- height += step
537
- heights .append (not_synced )
543
+
544
+ # Run an n-ary search in the interval [lo, hi).
545
+ # `lo` is always a height where we are synced.
546
+ # `hi` is always a height where sync state is unknown.
547
+ hi = min (peer_best_block .height , my_best_block .height )
548
+ lo = self .synced_block .height if self .synced_block else 0
549
+
550
+ last_block_hash = self ._settings .GENESIS_BLOCK_HASH
551
+
552
+ while hi - lo > 1 :
553
+ self .log .info ('find_best_common_block n-ary search query' , lo = lo , hi = hi )
554
+ step = math .ceil ((hi - lo ) / 10 )
555
+ heights = list (range (lo , hi , step ))
556
+ heights .append (hi )
557
+
538
558
block_height_list = yield self .get_peer_block_hashes (heights )
539
559
block_height_list .reverse ()
540
560
for height , block_hash in block_height_list :
541
561
try :
542
562
# We must check only fully validated transactions.
543
563
blk = self .tx_storage .get_transaction (block_hash )
564
+ except TransactionDoesNotExist :
565
+ hi = height
566
+ else :
544
567
assert blk .get_metadata ().validation .is_fully_connected ()
545
568
assert isinstance (blk , Block )
546
- if height != blk .get_height ():
547
- # WTF?! It should never happen.
548
- self .state = PeerState .ERROR
549
- return
550
- synced = height
569
+ assert height == blk .get_height ()
570
+ lo = height
571
+ last_block_hash = block_hash
551
572
break
552
- except TransactionDoesNotExist :
553
- not_synced = height
554
573
555
- self .log .debug ('find_best_common_block finished synced not_synced ' , synced = synced , not_synced = not_synced )
556
- self . synced_height = synced
574
+ self .log .debug ('find_best_common_block n-ary search finished ' , lo = lo , hi = hi )
575
+ return BlockInfo ( height = lo , id = last_block_hash )
557
576
558
- def get_peer_block_hashes (self , heights : list [int ]) -> Deferred [list [tuple [ int , bytes ] ]]:
577
+ def get_peer_block_hashes (self , heights : list [int ]) -> Deferred [list [BlockInfo ]]:
559
578
""" Returns the peer's block hashes in the given heights.
560
579
"""
561
580
if self ._deferred_peer_block_hashes is not None :
@@ -793,7 +812,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
793
812
self .blockchain_streaming .stop ()
794
813
self .blockchain_streaming = None
795
814
796
- def get_peer_best_block (self ) -> Deferred [dict [ str , Any ] ]:
815
+ def get_peer_best_block (self ) -> Deferred [BlockInfo ]:
797
816
""" Async call to get the remote peer's best block.
798
817
"""
799
818
if self ._deferred_best_block is not None :
@@ -813,21 +832,23 @@ def handle_get_best_block(self, payload: str) -> None:
813
832
"""
814
833
best_block = self .tx_storage .get_best_block ()
815
834
meta = best_block .get_metadata ()
835
+ assert meta .validation .is_fully_connected ()
836
+ # assert not meta.voided_by
816
837
data = {'block' : best_block .hash_hex , 'height' : meta .height }
817
838
self .send_message (ProtocolMessages .BEST_BLOCK , json .dumps (data ))
818
839
819
840
def handle_best_block (self , payload : str ) -> None :
820
841
""" Handle a BEST-BLOCK message.
821
842
"""
822
843
data = json .loads (payload )
823
- assert self . protocol . connections is not None
824
- self . log . debug ( 'got best block' , ** data )
825
- data [ 'block' ] = bytes . fromhex ( data [ 'block' ] )
844
+ _id = bytes . fromhex ( data [ 'block' ])
845
+ height = data [ 'height' ]
846
+ best_block = BlockInfo ( height = height , id = _id )
826
847
827
848
deferred = self ._deferred_best_block
828
849
self ._deferred_best_block = None
829
850
if deferred :
830
- deferred .callback (data )
851
+ deferred .callback (best_block )
831
852
832
853
def _setup_tx_streaming (self ):
833
854
""" Common setup before starting an outgoing transaction stream.
0 commit comments