Skip to content

Commit 89d347e

Browse files
committed
refactor(sync-v2): Refactor sync_v2 agent to hold (block_height, block_id) information in an internal namedtuple
1 parent 20474fd commit 89d347e

File tree

5 files changed

+119
-101
lines changed

5 files changed

+119
-101
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 95 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import struct
1919
from collections import OrderedDict
2020
from enum import Enum
21-
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast
21+
from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast
2222

2323
from structlog import get_logger
2424
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -44,6 +44,20 @@
4444
MAX_GET_TRANSACTIONS_BFS_LEN: int = 8
4545

4646

47+
class _HeightInfo(NamedTuple):
48+
height: int
49+
id: VertexId
50+
51+
def __repr__(self):
52+
return f'_HeightInfo({self.height}, {self.id.hex()})'
53+
54+
def to_json(self) -> dict[str, Any]:
55+
return {
56+
'height': self.height,
57+
'id': self.id.hex(),
58+
}
59+
60+
4761
class PeerState(Enum):
4862
ERROR = 'error'
4963
UNKNOWN = 'unknown'
@@ -92,16 +106,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
92106
self.receiving_stream = False
93107

94108
# highest block where we are synced
95-
self.synced_height = 0
109+
self.synced_block: Optional[_HeightInfo] = None
96110

97111
# highest block peer has
98-
self.peer_height = 0
112+
self.peer_best_block: Optional[_HeightInfo] = None
99113

100114
# Latest deferred waiting for a reply.
101115
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
102116
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
103-
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
104-
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None
117+
self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
118+
self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None
105119

106120
# When syncing blocks we start streaming with all peers
107121
# so the moment I get some repeated blocks, I stop the download
@@ -151,8 +165,8 @@ def get_status(self) -> dict[str, Any]:
151165
"""
152166
res = {
153167
'is_enabled': self.is_sync_enabled(),
154-
'peer_height': self.peer_height,
155-
'synced_height': self.synced_height,
168+
'peer_best_block': self.peer_best_block.to_json() if self.peer_best_block else None,
169+
'synced_block': self.synced_block.to_json() if self.synced_block else None,
156170
'synced': self._synced,
157171
'state': self.state.value,
158172
}
@@ -332,37 +346,43 @@ def run_sync_transactions(self) -> None:
332346
end_block_height=block_height)
333347
self.send_get_transactions_bfs(needed_txs, block.hash)
334348

349+
def get_my_best_block(self) -> _HeightInfo:
350+
"""Return my best block info."""
351+
bestblock = self.tx_storage.get_best_block()
352+
assert bestblock.hash is not None
353+
meta = bestblock.get_metadata()
354+
assert meta.validation.is_fully_connected()
355+
return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash)
356+
335357
@inlineCallbacks
336358
def run_sync_blocks(self) -> Generator[Any, Any, None]:
337359
""" Async step of the block syncing phase.
338360
"""
339361
assert self.tx_storage.indexes is not None
340362
self.state = PeerState.SYNCING_BLOCKS
341363

342-
# Find my height
343-
bestblock = self.tx_storage.get_best_block()
344-
assert bestblock.hash is not None
345-
meta = bestblock.get_metadata()
346-
my_height = meta.height
347-
348-
self.log.debug('run sync blocks', my_height=my_height)
364+
# Get my best block.
365+
my_best_block = self.get_my_best_block()
349366

350-
# Find best block
351-
data = yield self.get_peer_best_block()
352-
peer_best_block = data['block']
353-
peer_best_height = data['height']
354-
self.peer_height = peer_best_height
367+
# Find peer's best block
368+
self.peer_best_block = yield self.get_peer_best_block()
369+
assert self.peer_best_block is not None
355370

356371
# find best common block
357-
yield self.find_best_common_block(peer_best_height, peer_best_block)
358-
self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height)
359-
360-
if self.synced_height < self.peer_height:
372+
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
373+
assert self.synced_block is not None
374+
self.log.debug('run_sync_blocks',
375+
my_best_block=my_best_block,
376+
peer_best_block=self.peer_best_block,
377+
synced_block=self.synced_block)
378+
379+
if self.synced_block.height < self.peer_best_block.height:
361380
# sync from common block
362-
peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height])
363-
if peer_block_at_height:
364-
self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height)
365-
elif my_height == self.synced_height == self.peer_height:
381+
self.run_block_sync(self.synced_block.id,
382+
self.synced_block.height,
383+
self.peer_best_block.id,
384+
self.peer_best_block.height)
385+
elif my_best_block.height == self.synced_block.height == self.peer_best_block.height:
366386
# we're synced and on the same height, get their mempool
367387
self.state = PeerState.SYNCING_MEMPOOL
368388
self.mempool_manager.run()
@@ -494,68 +514,69 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
494514
return self.tx_storage.transaction_exists(vertex_id)
495515

496516
@inlineCallbacks
497-
def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]:
517+
def find_best_common_block(self,
518+
my_best_block: _HeightInfo,
519+
peer_best_block: _HeightInfo) -> Generator[Any, Any, _HeightInfo]:
498520
""" Search for the highest block/height where we're synced.
499521
"""
500-
assert self.tx_storage.indexes is not None
501-
my_best_height = self.tx_storage.get_height_best_block()
502-
503-
self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height)
522+
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)
504523

505-
if peer_best_height <= my_best_height:
506-
my_block = self.tx_storage.indexes.height.get(peer_best_height)
507-
if my_block == peer_best_block:
524+
if peer_best_block.height <= my_best_block.height:
525+
assert self.tx_storage.indexes is not None
526+
common_block_hash = self.tx_storage.indexes.height.get(peer_best_block.height)
527+
if peer_best_block.id == common_block_hash:
508528
# we have all the peer's blocks
509-
if peer_best_height == my_best_height:
529+
if peer_best_block.height == my_best_block.height:
510530
# We are in sync, ask for relay so the remote sends transactions in real time
511531
self.update_synced(True)
512532
self.send_relay()
513533
else:
514534
self.update_synced(False)
515-
516-
self.log.debug('synced to the latest peer block', height=peer_best_height)
517-
self.synced_height = peer_best_height
518-
return
535+
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
536+
return _HeightInfo(height=peer_best_block.height, id=common_block_hash)
519537
else:
520-
# TODO peer is on a different best chain
521-
self.log.warn('peer on different chain', peer_height=peer_best_height,
522-
peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else
523-
None))
538+
# peer is on a different best chain
539+
self.log.warn('peer on different chain',
540+
peer_best_block=peer_best_block,
541+
my_best_block=my_best_block)
524542

525543
self.update_synced(False)
526-
not_synced = min(peer_best_height, my_best_height)
527-
synced = self.synced_height
528-
529-
while not_synced - synced > 1:
530-
self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
531-
step = math.ceil((not_synced - synced)/10)
532-
heights = []
533-
height = synced
534-
while height < not_synced:
535-
heights.append(height)
536-
height += step
537-
heights.append(not_synced)
544+
545+
# Run an n-ary search in the interval [lo, hi).
546+
# `lo` is always a height where we are synced.
547+
# `hi` is always a height where sync state is unknown.
548+
hi = min(peer_best_block.height, my_best_block.height)
549+
lo = 0
550+
551+
lo_block_hash = self._settings.GENESIS_BLOCK_HASH
552+
553+
while hi - lo > 1:
554+
self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi)
555+
step = math.ceil((hi - lo) / 10)
556+
heights = list(range(lo, hi, step))
557+
heights.append(hi)
558+
538559
block_height_list = yield self.get_peer_block_hashes(heights)
539-
block_height_list.reverse()
560+
block_height_list.sort(key=lambda x: x.height, reverse=True)
561+
540562
for height, block_hash in block_height_list:
541563
try:
542564
# We must check only fully validated transactions.
543565
blk = self.tx_storage.get_transaction(block_hash)
566+
except TransactionDoesNotExist:
567+
hi = height
568+
else:
544569
assert blk.get_metadata().validation.is_fully_connected()
545570
assert isinstance(blk, Block)
546-
if height != blk.get_height():
547-
# WTF?! It should never happen.
548-
self.state = PeerState.ERROR
549-
return
550-
synced = height
571+
assert height == blk.get_height()
572+
lo = height
573+
lo_block_hash = block_hash
551574
break
552-
except TransactionDoesNotExist:
553-
not_synced = height
554575

555-
self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced)
556-
self.synced_height = synced
576+
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
577+
return _HeightInfo(height=lo, id=lo_block_hash)
557578

558-
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
579+
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
559580
""" Returns the peer's block hashes in the given heights.
560581
"""
561582
if self._deferred_peer_block_hashes is not None:
@@ -597,7 +618,7 @@ def handle_peer_block_hashes(self, payload: str) -> None:
597618
""" Handle a PEER-BLOCK-HASHES message.
598619
"""
599620
data = json.loads(payload)
600-
data = [(h, bytes.fromhex(block_hash)) for (h, block_hash) in data]
621+
data = [_HeightInfo(height=h, id=bytes.fromhex(block_hash)) for (h, block_hash) in data]
601622
deferred = self._deferred_peer_block_hashes
602623
self._deferred_peer_block_hashes = None
603624
if deferred:
@@ -799,7 +820,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
799820
self.blockchain_streaming.stop()
800821
self.blockchain_streaming = None
801822

802-
def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
823+
def get_peer_best_block(self) -> Deferred[_HeightInfo]:
803824
""" Async call to get the remote peer's best block.
804825
"""
805826
if self._deferred_best_block is not None:
@@ -819,21 +840,22 @@ def handle_get_best_block(self, payload: str) -> None:
819840
"""
820841
best_block = self.tx_storage.get_best_block()
821842
meta = best_block.get_metadata()
843+
assert meta.validation.is_fully_connected()
822844
data = {'block': best_block.hash_hex, 'height': meta.height}
823845
self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))
824846

825847
def handle_best_block(self, payload: str) -> None:
826848
""" Handle a BEST-BLOCK message.
827849
"""
828850
data = json.loads(payload)
829-
assert self.protocol.connections is not None
830-
self.log.debug('got best block', **data)
831-
data['block'] = bytes.fromhex(data['block'])
851+
_id = bytes.fromhex(data['block'])
852+
height = data['height']
853+
best_block = _HeightInfo(height=height, id=_id)
832854

833855
deferred = self._deferred_best_block
834856
self._deferred_best_block = None
835857
if deferred:
836-
deferred.callback(data)
858+
deferred.callback(best_block)
837859

838860
def _setup_tx_streaming(self):
839861
""" Common setup before starting an outgoing transaction stream.

tests/p2p/test_get_best_blockchain.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
class BaseGetBestBlockchainTestCase(SimulatorTestCase):
2020

21+
seed_config = 6
22+
2123
def _send_cmd(self, proto, cmd, payload=None):
2224
if not payload:
2325
line = '{}\r\n'.format(cmd)

tests/p2p/test_sync.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,9 @@ def test_sync_metadata(self):
503503
# check they have the same consensus
504504
node_sync1 = conn.proto1.state.sync_agent
505505
node_sync2 = conn.proto2.state.sync_agent
506-
self.assertEqual(node_sync1.peer_height, height)
507-
self.assertEqual(node_sync1.synced_height, height)
508-
self.assertEqual(node_sync2.peer_height, height)
506+
self.assertEqual(node_sync1.peer_best_block.height, height)
507+
self.assertEqual(node_sync1.synced_block.height, height)
508+
self.assertEqual(node_sync2.peer_best_block.height, height)
509509
# 3 genesis + blocks + 8 txs
510510
self.assertEqual(self.manager1.tx_storage.get_vertices_count(), height + 11)
511511
self.assertEqual(manager2.tx_storage.get_vertices_count(), height + 11)
@@ -527,14 +527,14 @@ def test_tx_propagation_nat_peers(self):
527527

528528
node_sync1 = self.conn1.proto1.state.sync_agent
529529
self.assertEqual(self.manager1.tx_storage.latest_timestamp, self.manager2.tx_storage.latest_timestamp)
530-
self.assertEqual(node_sync1.peer_height, node_sync1.synced_height)
531-
self.assertEqual(node_sync1.peer_height, self.manager1.tx_storage.get_height_best_block())
530+
self.assertEqual(node_sync1.peer_best_block, node_sync1.synced_block)
531+
self.assertEqual(node_sync1.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
532532
self.assertConsensusEqual(self.manager1, self.manager2)
533533

534534
node_sync2 = self.conn2.proto1.state.sync_agent
535535
self.assertEqual(self.manager2.tx_storage.latest_timestamp, self.manager3.tx_storage.latest_timestamp)
536-
self.assertEqual(node_sync2.peer_height, node_sync2.synced_height)
537-
self.assertEqual(node_sync2.peer_height, self.manager2.tx_storage.get_height_best_block())
536+
self.assertEqual(node_sync2.peer_best_block, node_sync2.synced_block)
537+
self.assertEqual(node_sync2.peer_best_block.height, self.manager2.tx_storage.get_height_best_block())
538538
self.assertConsensusEqual(self.manager2, self.manager3)
539539

540540
def test_block_sync_new_blocks_and_txs(self):
@@ -560,8 +560,8 @@ def test_block_sync_new_blocks_and_txs(self):
560560

561561
node_sync = conn.proto1.state.sync_agent
562562
self.assertEqual(self.manager1.tx_storage.latest_timestamp, manager2.tx_storage.latest_timestamp)
563-
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
564-
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
563+
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
564+
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
565565
self.assertConsensusEqual(self.manager1, manager2)
566566
self.assertConsensusValid(self.manager1)
567567
self.assertConsensusValid(manager2)
@@ -581,8 +581,8 @@ def test_block_sync_many_new_blocks(self):
581581
self.clock.advance(1)
582582

583583
node_sync = conn.proto1.state.sync_agent
584-
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
585-
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
584+
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
585+
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
586586
self.assertConsensusEqual(self.manager1, manager2)
587587
self.assertConsensusValid(self.manager1)
588588
self.assertConsensusValid(manager2)
@@ -602,8 +602,8 @@ def test_block_sync_new_blocks(self):
602602
self.clock.advance(1)
603603

604604
node_sync = conn.proto1.state.sync_agent
605-
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
606-
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
605+
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
606+
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
607607
self.assertConsensusEqual(self.manager1, manager2)
608608
self.assertConsensusValid(self.manager1)
609609
self.assertConsensusValid(manager2)
@@ -664,9 +664,9 @@ def test_full_sync(self):
664664

665665
node_sync1 = conn.proto1.state.sync_agent
666666
node_sync2 = conn.proto2.state.sync_agent
667-
self.assertEqual(node_sync1.peer_height, common_height)
668-
self.assertEqual(node_sync1.synced_height, common_height)
669-
self.assertEqual(node_sync2.peer_height, common_height)
667+
self.assertEqual(node_sync1.peer_best_block.height, common_height)
668+
self.assertEqual(node_sync1.synced_block.height, common_height)
669+
self.assertEqual(node_sync2.peer_best_block.height, common_height)
670670
self.assertConsensusValid(self.manager1)
671671
self.assertConsensusValid(manager2)
672672
self.assertConsensusEqual(self.manager1, manager2)
@@ -715,9 +715,9 @@ def test_block_sync_checkpoints(self):
715715
node_sync1 = conn.proto1.state.sync_agent
716716
node_sync2 = conn.proto2.state.sync_agent
717717

718-
self.assertEqual(node_sync1.peer_height, TOTAL_BLOCKS)
719-
self.assertEqual(node_sync1.synced_height, TOTAL_BLOCKS)
720-
self.assertEqual(node_sync2.peer_height, len(blocks))
718+
self.assertEqual(node_sync1.peer_best_block.height, TOTAL_BLOCKS)
719+
self.assertEqual(node_sync1.synced_block.height, TOTAL_BLOCKS)
720+
self.assertEqual(node_sync2.peer_best_block.height, len(blocks))
721721
self.assertConsensusValid(self.manager1)
722722
self.assertConsensusValid(manager2)
723723

@@ -738,8 +738,8 @@ def test_block_sync_only_genesis(self):
738738
self.clock.advance(1)
739739

740740
node_sync = conn.proto1.state.sync_agent
741-
self.assertEqual(node_sync.synced_height, 0)
742-
self.assertEqual(node_sync.peer_height, 0)
741+
self.assertEqual(node_sync.synced_block.height, 0)
742+
self.assertEqual(node_sync.peer_best_block.height, 0)
743743

744744
self.assertEqual(self.manager1.tx_storage.get_vertices_count(), 3)
745745
self.assertEqual(manager2.tx_storage.get_vertices_count(), 3)

0 commit comments

Comments
 (0)