Skip to content

Commit 78373d2

Browse files
committed
refactor(sync-v2): Use BlockInfo in sync_v2 agent to hold (block_height, block_id) information
1 parent 0a78bb9 commit 78373d2

File tree

1 file changed

+83
-68
lines changed

1 file changed

+83
-68
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 83 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import struct
1919
from collections import OrderedDict
2020
from enum import Enum
21-
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast
21+
from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast
2222

2323
from structlog import get_logger
2424
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -44,6 +44,14 @@
4444
MAX_GET_TRANSACTIONS_BFS_LEN: int = 8
4545

4646

47+
class BlockInfo(NamedTuple):
48+
height: int
49+
id: VertexId
50+
51+
def __repr__(self):
52+
return f'BlockInfo({self.height}, {self.id.hex()})'
53+
54+
4755
class PeerState(Enum):
4856
ERROR = 'error'
4957
UNKNOWN = 'unknown'
@@ -92,16 +100,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
92100
self.receiving_stream = False
93101

94102
# highest block where we are synced
95-
self.synced_height = 0
103+
self.synced_block: Optional[BlockInfo] = None
96104

97105
# highest block peer has
98-
self.peer_height = 0
106+
self.peer_best_block: Optional[BlockInfo] = None
99107

100108
# Latest deferred waiting for a reply.
101109
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
102110
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
103-
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
104-
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None
111+
self._deferred_best_block: Optional[Deferred[BlockInfo]] = None
112+
self._deferred_peer_block_hashes: Optional[Deferred[list[BlockInfo]]] = None
105113

106114
# When syncing blocks we start streaming with all peers
107115
# so the moment I get some repeated blocks, I stop the download
@@ -151,8 +159,8 @@ def get_status(self) -> dict[str, Any]:
151159
"""
152160
res = {
153161
'is_enabled': self.is_sync_enabled(),
154-
'peer_height': self.peer_height,
155-
'synced_height': self.synced_height,
162+
'peer_best_block': self.peer_best_block,
163+
'synced_block': self.synced_block,
156164
'synced': self._synced,
157165
'state': self.state.value,
158166
}
@@ -332,37 +340,43 @@ def run_sync_transactions(self) -> None:
332340
end_block_height=block_height)
333341
self.send_get_transactions_bfs(needed_txs, block.hash)
334342

343+
def get_my_best_block(self) -> BlockInfo:
344+
"""Return my best block info."""
345+
bestblock = self.tx_storage.get_best_block()
346+
assert bestblock.hash is not None
347+
meta = bestblock.get_metadata()
348+
assert not meta.voided_by
349+
assert meta.validation.is_fully_connected()
350+
return BlockInfo(height=bestblock.get_height(), id=bestblock.hash)
351+
335352
@inlineCallbacks
336353
def run_sync_blocks(self) -> Generator[Any, Any, None]:
337354
""" Async step of the block syncing phase.
338355
"""
339356
assert self.tx_storage.indexes is not None
340357
self.state = PeerState.SYNCING_BLOCKS
341358

342-
# Find my height
343-
bestblock = self.tx_storage.get_best_block()
344-
assert bestblock.hash is not None
345-
meta = bestblock.get_metadata()
346-
my_height = meta.height
359+
# Get my best block.
360+
my_best_block = self.get_my_best_block()
347361

348-
self.log.debug('run sync blocks', my_height=my_height)
362+
# Find peer's best block
363+
self.peer_best_block = yield self.get_peer_best_block()
364+
assert self.peer_best_block is not None
349365

350-
# Find best block
351-
data = yield self.get_peer_best_block()
352-
peer_best_block = data['block']
353-
peer_best_height = data['height']
354-
self.peer_height = peer_best_height
366+
self.log.debug('run_sync_blocks', my_best_block=my_best_block, peer_best_block=self.peer_best_block)
355367

356368
# find best common block
357-
yield self.find_best_common_block(peer_best_height, peer_best_block)
358-
self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height)
369+
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
370+
assert self.synced_block is not None
371+
self.log.debug('run_sync_blocks', peer_best_block=self.peer_best_block, synced_block=self.synced_block)
359372

360-
if self.synced_height < self.peer_height:
373+
if self.synced_block.height < self.peer_best_block.height:
361374
# sync from common block
362-
peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height])
363-
if peer_block_at_height:
364-
self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height)
365-
elif my_height == self.synced_height == self.peer_height:
375+
self.run_block_sync(self.synced_block.id,
376+
self.synced_block.height,
377+
self.peer_best_block.id,
378+
self.peer_best_block.height)
379+
elif my_best_block.height == self.synced_block.height == self.peer_best_block.height:
366380
# we're synced and on the same height, get their mempool
367381
self.state = PeerState.SYNCING_MEMPOOL
368382
self.mempool_manager.run()
@@ -494,68 +508,67 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
494508
return self.tx_storage.transaction_exists(vertex_id)
495509

496510
@inlineCallbacks
497-
def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]:
511+
def find_best_common_block(self,
512+
my_best_block: BlockInfo,
513+
peer_best_block: BlockInfo) -> Generator[Any, Any, BlockInfo]:
498514
""" Search for the highest block/height where we're synced.
499515
"""
500-
assert self.tx_storage.indexes is not None
501-
my_best_height = self.tx_storage.get_height_best_block()
516+
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)
502517

503-
self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height)
504-
505-
if peer_best_height <= my_best_height:
506-
my_block = self.tx_storage.indexes.height.get(peer_best_height)
507-
if my_block == peer_best_block:
518+
if peer_best_block.height <= my_best_block.height:
519+
if peer_best_block.id == my_best_block.id:
508520
# we have all the peer's blocks
509-
if peer_best_height == my_best_height:
521+
if peer_best_block.height == my_best_block.height:
510522
# We are in sync, ask for relay so the remote sends transactions in real time
511523
self.update_synced(True)
512524
self.send_relay()
513525
else:
514526
self.update_synced(False)
515527

516-
self.log.debug('synced to the latest peer block', height=peer_best_height)
517-
self.synced_height = peer_best_height
518-
return
528+
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
529+
return peer_best_block
519530
else:
520-
# TODO peer is on a different best chain
521-
self.log.warn('peer on different chain', peer_height=peer_best_height,
522-
peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else
523-
None))
531+
# peer is on a different best chain
532+
self.log.warn('peer on different chain',
533+
peer_best_block=peer_best_block,
534+
my_best_block=my_best_block)
524535

525536
self.update_synced(False)
526-
not_synced = min(peer_best_height, my_best_height)
527-
synced = self.synced_height
528-
529-
while not_synced - synced > 1:
530-
self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
531-
step = math.ceil((not_synced - synced)/10)
532-
heights = []
533-
height = synced
534-
while height < not_synced:
535-
heights.append(height)
536-
height += step
537-
heights.append(not_synced)
537+
538+
# Run an n-ary search in the interval [lo, hi).
539+
# `lo` is always a height where we are synced.
540+
# `hi` is always a height where sync state is unknown.
541+
hi = min(peer_best_block.height, my_best_block.height)
542+
lo = self.synced_block.height if self.synced_block else 0
543+
544+
last_block_hash = self._settings.GENESIS_BLOCK_HASH
545+
546+
while hi - lo > 1:
547+
self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi)
548+
step = math.ceil((hi - lo) / 10)
549+
heights = list(range(lo, hi, step))
550+
heights.append(hi)
551+
538552
block_height_list = yield self.get_peer_block_hashes(heights)
539553
block_height_list.reverse()
540554
for height, block_hash in block_height_list:
541555
try:
542556
# We must check only fully validated transactions.
543557
blk = self.tx_storage.get_transaction(block_hash)
558+
except TransactionDoesNotExist:
559+
hi = height
560+
else:
544561
assert blk.get_metadata().validation.is_fully_connected()
545562
assert isinstance(blk, Block)
546-
if height != blk.get_height():
547-
# WTF?! It should never happen.
548-
self.state = PeerState.ERROR
549-
return
550-
synced = height
563+
assert height == blk.get_height()
564+
lo = height
565+
last_block_hash = block_hash
551566
break
552-
except TransactionDoesNotExist:
553-
not_synced = height
554567

555-
self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced)
556-
self.synced_height = synced
568+
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
569+
return BlockInfo(height=lo, id=last_block_hash)
557570

558-
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
571+
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[BlockInfo]]:
559572
""" Returns the peer's block hashes in the given heights.
560573
"""
561574
if self._deferred_peer_block_hashes is not None:
@@ -793,7 +806,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
793806
self.blockchain_streaming.stop()
794807
self.blockchain_streaming = None
795808

796-
def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
809+
def get_peer_best_block(self) -> Deferred[BlockInfo]:
797810
""" Async call to get the remote peer's best block.
798811
"""
799812
if self._deferred_best_block is not None:
@@ -813,21 +826,23 @@ def handle_get_best_block(self, payload: str) -> None:
813826
"""
814827
best_block = self.tx_storage.get_best_block()
815828
meta = best_block.get_metadata()
829+
assert meta.validation.is_fully_connected()
830+
assert not meta.voided_by
816831
data = {'block': best_block.hash_hex, 'height': meta.height}
817832
self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))
818833

819834
def handle_best_block(self, payload: str) -> None:
820835
""" Handle a BEST-BLOCK message.
821836
"""
822837
data = json.loads(payload)
823-
assert self.protocol.connections is not None
824-
self.log.debug('got best block', **data)
825-
data['block'] = bytes.fromhex(data['block'])
838+
_id = bytes.fromhex(data['block'])
839+
height = data['height']
840+
best_block = BlockInfo(height=height, id=_id)
826841

827842
deferred = self._deferred_best_block
828843
self._deferred_best_block = None
829844
if deferred:
830-
deferred.callback(data)
845+
deferred.callback(best_block)
831846

832847
def _setup_tx_streaming(self):
833848
""" Common setup before starting an outgoing transaction stream.

0 commit comments

Comments
 (0)