Skip to content

Commit 57bf117

Browse files
committed
refactor(sync-v2): Use BlockInfo in sync_v2 agent to hold (block_height, block_id) information
1 parent 0a78bb9 commit 57bf117

File tree

1 file changed

+57
-47
lines changed

1 file changed

+57
-47
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import struct
1919
from collections import OrderedDict
2020
from enum import Enum
21-
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast
21+
from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast
2222

2323
from structlog import get_logger
2424
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -44,6 +44,14 @@
4444
MAX_GET_TRANSACTIONS_BFS_LEN: int = 8
4545

4646

47+
class BlockInfo(NamedTuple):
48+
height: int
49+
id: VertexId
50+
51+
def __repr__(self):
52+
return f'BlockInfo({self.height}, {self.id.hex()})'
53+
54+
4755
class PeerState(Enum):
4856
ERROR = 'error'
4957
UNKNOWN = 'unknown'
@@ -92,16 +100,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
92100
self.receiving_stream = False
93101

94102
# highest block where we are synced
95-
self.synced_height = 0
103+
self.synced_block: Optional[BlockInfo] = None
96104

97105
# highest block peer has
98-
self.peer_height = 0
106+
self.peer_best_block: Optional[BlockInfo] = None
99107

100108
# Latest deferred waiting for a reply.
101109
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
102110
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
103-
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
104-
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None
111+
self._deferred_best_block: Optional[Deferred[BlockInfo]] = None
112+
self._deferred_peer_block_hashes: Optional[Deferred[list[BlockInfo]]] = None
105113

106114
# When syncing blocks we start streaming with all peers
107115
# so the moment I get some repeated blocks, I stop the download
@@ -151,8 +159,8 @@ def get_status(self) -> dict[str, Any]:
151159
"""
152160
res = {
153161
'is_enabled': self.is_sync_enabled(),
154-
'peer_height': self.peer_height,
155-
'synced_height': self.synced_height,
162+
'peer_best_block': self.peer_best_block,
163+
'synced_block': self.synced_block,
156164
'synced': self._synced,
157165
'state': self.state.value,
158166
}
@@ -347,22 +355,22 @@ def run_sync_blocks(self) -> Generator[Any, Any, None]:
347355

348356
self.log.debug('run sync blocks', my_height=my_height)
349357

350-
# Find best block
351-
data = yield self.get_peer_best_block()
352-
peer_best_block = data['block']
353-
peer_best_height = data['height']
354-
self.peer_height = peer_best_height
358+
# Find peer's best block
359+
self.peer_best_block = yield self.get_peer_best_block()
360+
assert self.peer_best_block is not None
355361

356362
# find best common block
357-
yield self.find_best_common_block(peer_best_height, peer_best_block)
358-
self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height)
363+
self.synced_block = yield self.find_best_common_block(self.peer_best_block)
364+
assert self.synced_block is not None
365+
self.log.debug('run_sync_blocks', peer_best_block=self.peer_best_block, synced_block=self.synced_block)
359366

360-
if self.synced_height < self.peer_height:
367+
if self.synced_block.height < self.peer_best_block.height:
361368
# sync from common block
362-
peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height])
363-
if peer_block_at_height:
364-
self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height)
365-
elif my_height == self.synced_height == self.peer_height:
369+
self.run_block_sync(self.synced_block.id,
370+
self.synced_block.height,
371+
self.peer_best_block.id,
372+
self.peer_best_block.height)
373+
elif my_height == self.synced_block.height == self.peer_best_block.height:
366374
# we're synced and on the same height, get their mempool
367375
self.state = PeerState.SYNCING_MEMPOOL
368376
self.mempool_manager.run()
@@ -494,40 +502,41 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
494502
return self.tx_storage.transaction_exists(vertex_id)
495503

496504
@inlineCallbacks
497-
def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]:
505+
def find_best_common_block(self, peer_best_block: BlockInfo) -> Generator[Any, Any, BlockInfo]:
498506
""" Search for the highest block/height where we're synced.
499507
"""
500508
assert self.tx_storage.indexes is not None
501509
my_best_height = self.tx_storage.get_height_best_block()
502510

503-
self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height)
511+
self.log.debug('find common chain', peer_height=peer_best_block.height, my_height=my_best_height)
504512

505-
if peer_best_height <= my_best_height:
506-
my_block = self.tx_storage.indexes.height.get(peer_best_height)
507-
if my_block == peer_best_block:
513+
if peer_best_block.height <= my_best_height:
514+
my_block = self.tx_storage.indexes.height.get(peer_best_block.height)
515+
if my_block == peer_best_block.id:
508516
# we have all the peer's blocks
509-
if peer_best_height == my_best_height:
517+
if peer_best_block.height == my_best_height:
510518
# We are in sync, ask for relay so the remote sends transactions in real time
511519
self.update_synced(True)
512520
self.send_relay()
513521
else:
514522
self.update_synced(False)
515523

516-
self.log.debug('synced to the latest peer block', height=peer_best_height)
517-
self.synced_height = peer_best_height
518-
return
524+
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
525+
return peer_best_block
519526
else:
520527
# TODO peer is on a different best chain
521-
self.log.warn('peer on different chain', peer_height=peer_best_height,
522-
peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else
523-
None))
528+
self.log.warn('peer on different chain',
529+
peer_best_block=peer_best_block,
530+
my_block=(my_block.hex() if my_block is not None else None))
524531

525532
self.update_synced(False)
526-
not_synced = min(peer_best_height, my_best_height)
527-
synced = self.synced_height
533+
not_synced = min(peer_best_block.height, my_best_height)
534+
synced = self.synced_block.height if self.synced_block else 0
535+
536+
last_block_hash = self._settings.GENESIS_BLOCK_HASH
528537

529538
while not_synced - synced > 1:
530-
self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
539+
self.log.info('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
531540
step = math.ceil((not_synced - synced)/10)
532541
heights = []
533542
height = synced
@@ -541,21 +550,20 @@ def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes)
541550
try:
542551
# We must check only fully validated transactions.
543552
blk = self.tx_storage.get_transaction(block_hash)
553+
except TransactionDoesNotExist:
554+
not_synced = height
555+
else:
544556
assert blk.get_metadata().validation.is_fully_connected()
545557
assert isinstance(blk, Block)
546-
if height != blk.get_height():
547-
# WTF?! It should never happen.
548-
self.state = PeerState.ERROR
549-
return
558+
assert height == blk.get_height()
550559
synced = height
560+
last_block_hash = block_hash
551561
break
552-
except TransactionDoesNotExist:
553-
not_synced = height
554562

555563
self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced)
556-
self.synced_height = synced
564+
return BlockInfo(height=synced, id=last_block_hash)
557565

558-
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
566+
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[BlockInfo]]:
559567
""" Returns the peer's block hashes in the given heights.
560568
"""
561569
if self._deferred_peer_block_hashes is not None:
@@ -793,7 +801,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
793801
self.blockchain_streaming.stop()
794802
self.blockchain_streaming = None
795803

796-
def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
804+
def get_peer_best_block(self) -> Deferred[BlockInfo]:
797805
""" Async call to get the remote peer's best block.
798806
"""
799807
if self._deferred_best_block is not None:
@@ -813,21 +821,23 @@ def handle_get_best_block(self, payload: str) -> None:
813821
"""
814822
best_block = self.tx_storage.get_best_block()
815823
meta = best_block.get_metadata()
824+
assert meta.validation.is_fully_connected()
825+
assert not meta.voided_by
816826
data = {'block': best_block.hash_hex, 'height': meta.height}
817827
self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))
818828

819829
def handle_best_block(self, payload: str) -> None:
820830
""" Handle a BEST-BLOCK message.
821831
"""
822832
data = json.loads(payload)
823-
assert self.protocol.connections is not None
824-
self.log.debug('got best block', **data)
825-
data['block'] = bytes.fromhex(data['block'])
833+
_id = bytes.fromhex(data['block'])
834+
height = data['height']
835+
best_block = BlockInfo(height=height, id=_id)
826836

827837
deferred = self._deferred_best_block
828838
self._deferred_best_block = None
829839
if deferred:
830-
deferred.callback(data)
840+
deferred.callback(best_block)
831841

832842
def _setup_tx_streaming(self):
833843
""" Common setup before starting an outgoing transaction stream.

0 commit comments

Comments
 (0)