Skip to content

Commit c0cdfa3

Browse files
committed
refactor(sync-v2): Use BlockInfo in sync_v2 agent to hold (block_height, block_id) information
1 parent 0a78bb9 commit c0cdfa3

File tree

5 files changed

+116
-92
lines changed

5 files changed

+116
-92
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 91 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import struct
1919
from collections import OrderedDict
2020
from enum import Enum
21-
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast
21+
from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast
2222

2323
from structlog import get_logger
2424
from twisted.internet.defer import Deferred, inlineCallbacks
@@ -44,6 +44,20 @@
4444
MAX_GET_TRANSACTIONS_BFS_LEN: int = 8
4545

4646

47+
class BlockInfo(NamedTuple):
48+
height: int
49+
id: VertexId
50+
51+
def __repr__(self):
52+
return f'BlockInfo({self.height}, {self.id.hex()})'
53+
54+
def to_json(self) -> dict[str, Any]:
55+
return {
56+
'height': self.height,
57+
'id': self.id.hex(),
58+
}
59+
60+
4761
class PeerState(Enum):
4862
ERROR = 'error'
4963
UNKNOWN = 'unknown'
@@ -92,16 +106,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
92106
self.receiving_stream = False
93107

94108
# highest block where we are synced
95-
self.synced_height = 0
109+
self.synced_block: Optional[BlockInfo] = None
96110

97111
# highest block peer has
98-
self.peer_height = 0
112+
self.peer_best_block: Optional[BlockInfo] = None
99113

100114
# Latest deferred waiting for a reply.
101115
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
102116
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
103-
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
104-
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None
117+
self._deferred_best_block: Optional[Deferred[BlockInfo]] = None
118+
self._deferred_peer_block_hashes: Optional[Deferred[list[BlockInfo]]] = None
105119

106120
# When syncing blocks we start streaming with all peers
107121
# so the moment I get some repeated blocks, I stop the download
@@ -151,8 +165,8 @@ def get_status(self) -> dict[str, Any]:
151165
"""
152166
res = {
153167
'is_enabled': self.is_sync_enabled(),
154-
'peer_height': self.peer_height,
155-
'synced_height': self.synced_height,
168+
'peer_best_block': self.peer_best_block.to_json() if self.peer_best_block else None,
169+
'synced_block': self.synced_block.to_json() if self.synced_block else None,
156170
'synced': self._synced,
157171
'state': self.state.value,
158172
}
@@ -332,37 +346,43 @@ def run_sync_transactions(self) -> None:
332346
end_block_height=block_height)
333347
self.send_get_transactions_bfs(needed_txs, block.hash)
334348

349+
def get_my_best_block(self) -> BlockInfo:
350+
"""Return my best block info."""
351+
bestblock = self.tx_storage.get_best_block()
352+
assert bestblock.hash is not None
353+
meta = bestblock.get_metadata()
354+
# assert not meta.voided_by
355+
assert meta.validation.is_fully_connected()
356+
return BlockInfo(height=bestblock.get_height(), id=bestblock.hash)
357+
335358
@inlineCallbacks
336359
def run_sync_blocks(self) -> Generator[Any, Any, None]:
337360
""" Async step of the block syncing phase.
338361
"""
339362
assert self.tx_storage.indexes is not None
340363
self.state = PeerState.SYNCING_BLOCKS
341364

342-
# Find my height
343-
bestblock = self.tx_storage.get_best_block()
344-
assert bestblock.hash is not None
345-
meta = bestblock.get_metadata()
346-
my_height = meta.height
365+
# Get my best block.
366+
my_best_block = self.get_my_best_block()
347367

348-
self.log.debug('run sync blocks', my_height=my_height)
368+
# Find peer's best block
369+
self.peer_best_block = yield self.get_peer_best_block()
370+
assert self.peer_best_block is not None
349371

350-
# Find best block
351-
data = yield self.get_peer_best_block()
352-
peer_best_block = data['block']
353-
peer_best_height = data['height']
354-
self.peer_height = peer_best_height
372+
self.log.debug('run_sync_blocks', my_best_block=my_best_block, peer_best_block=self.peer_best_block)
355373

356374
# find best common block
357-
yield self.find_best_common_block(peer_best_height, peer_best_block)
358-
self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height)
375+
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
376+
assert self.synced_block is not None
377+
self.log.debug('run_sync_blocks', peer_best_block=self.peer_best_block, synced_block=self.synced_block)
359378

360-
if self.synced_height < self.peer_height:
379+
if self.synced_block.height < self.peer_best_block.height:
361380
# sync from common block
362-
peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height])
363-
if peer_block_at_height:
364-
self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height)
365-
elif my_height == self.synced_height == self.peer_height:
381+
self.run_block_sync(self.synced_block.id,
382+
self.synced_block.height,
383+
self.peer_best_block.id,
384+
self.peer_best_block.height)
385+
elif my_best_block.height == self.synced_block.height == self.peer_best_block.height:
366386
# we're synced and on the same height, get their mempool
367387
self.state = PeerState.SYNCING_MEMPOOL
368388
self.mempool_manager.run()
@@ -494,68 +514,68 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
494514
return self.tx_storage.transaction_exists(vertex_id)
495515

496516
@inlineCallbacks
497-
def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]:
517+
def find_best_common_block(self,
518+
my_best_block: BlockInfo,
519+
peer_best_block: BlockInfo) -> Generator[Any, Any, BlockInfo]:
498520
""" Search for the highest block/height where we're synced.
499521
"""
500-
assert self.tx_storage.indexes is not None
501-
my_best_height = self.tx_storage.get_height_best_block()
502-
503-
self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height)
522+
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)
504523

505-
if peer_best_height <= my_best_height:
506-
my_block = self.tx_storage.indexes.height.get(peer_best_height)
507-
if my_block == peer_best_block:
524+
if peer_best_block.height <= my_best_block.height:
525+
assert self.tx_storage.indexes is not None
526+
common_block_hash = self.tx_storage.indexes.height.get(peer_best_block.height)
527+
if peer_best_block.id == common_block_hash:
508528
# we have all the peer's blocks
509-
if peer_best_height == my_best_height:
529+
if peer_best_block.height == my_best_block.height:
510530
# We are in sync, ask for relay so the remote sends transactions in real time
511531
self.update_synced(True)
512532
self.send_relay()
513533
else:
514534
self.update_synced(False)
515-
516-
self.log.debug('synced to the latest peer block', height=peer_best_height)
517-
self.synced_height = peer_best_height
518-
return
535+
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
536+
return BlockInfo(height=peer_best_block.height, id=common_block_hash)
519537
else:
520-
# TODO peer is on a different best chain
521-
self.log.warn('peer on different chain', peer_height=peer_best_height,
522-
peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else
523-
None))
538+
# peer is on a different best chain
539+
self.log.warn('peer on different chain',
540+
peer_best_block=peer_best_block,
541+
my_best_block=my_best_block)
524542

525543
self.update_synced(False)
526-
not_synced = min(peer_best_height, my_best_height)
527-
synced = self.synced_height
528-
529-
while not_synced - synced > 1:
530-
self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
531-
step = math.ceil((not_synced - synced)/10)
532-
heights = []
533-
height = synced
534-
while height < not_synced:
535-
heights.append(height)
536-
height += step
537-
heights.append(not_synced)
544+
545+
# Run an n-ary search in the interval [lo, hi).
546+
# `lo` is always a height where we are synced.
547+
# `hi` is always a height where sync state is unknown.
548+
hi = min(peer_best_block.height, my_best_block.height)
549+
lo = self.synced_block.height if self.synced_block else 0
550+
551+
last_block_hash = self._settings.GENESIS_BLOCK_HASH
552+
553+
while hi - lo > 1:
554+
self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi)
555+
step = math.ceil((hi - lo) / 10)
556+
heights = list(range(lo, hi, step))
557+
heights.append(hi)
558+
538559
block_height_list = yield self.get_peer_block_hashes(heights)
539560
block_height_list.reverse()
540561
for height, block_hash in block_height_list:
541562
try:
542563
# We must check only fully validated transactions.
543564
blk = self.tx_storage.get_transaction(block_hash)
565+
except TransactionDoesNotExist:
566+
hi = height
567+
else:
544568
assert blk.get_metadata().validation.is_fully_connected()
545569
assert isinstance(blk, Block)
546-
if height != blk.get_height():
547-
# WTF?! It should never happen.
548-
self.state = PeerState.ERROR
549-
return
550-
synced = height
570+
assert height == blk.get_height()
571+
lo = height
572+
last_block_hash = block_hash
551573
break
552-
except TransactionDoesNotExist:
553-
not_synced = height
554574

555-
self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced)
556-
self.synced_height = synced
575+
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
576+
return BlockInfo(height=lo, id=last_block_hash)
557577

558-
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
578+
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[BlockInfo]]:
559579
""" Returns the peer's block hashes in the given heights.
560580
"""
561581
if self._deferred_peer_block_hashes is not None:
@@ -793,7 +813,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
793813
self.blockchain_streaming.stop()
794814
self.blockchain_streaming = None
795815

796-
def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
816+
def get_peer_best_block(self) -> Deferred[BlockInfo]:
797817
""" Async call to get the remote peer's best block.
798818
"""
799819
if self._deferred_best_block is not None:
@@ -813,21 +833,23 @@ def handle_get_best_block(self, payload: str) -> None:
813833
"""
814834
best_block = self.tx_storage.get_best_block()
815835
meta = best_block.get_metadata()
836+
assert meta.validation.is_fully_connected()
837+
# assert not meta.voided_by
816838
data = {'block': best_block.hash_hex, 'height': meta.height}
817839
self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))
818840

819841
def handle_best_block(self, payload: str) -> None:
820842
""" Handle a BEST-BLOCK message.
821843
"""
822844
data = json.loads(payload)
823-
assert self.protocol.connections is not None
824-
self.log.debug('got best block', **data)
825-
data['block'] = bytes.fromhex(data['block'])
845+
_id = bytes.fromhex(data['block'])
846+
height = data['height']
847+
best_block = BlockInfo(height=height, id=_id)
826848

827849
deferred = self._deferred_best_block
828850
self._deferred_best_block = None
829851
if deferred:
830-
deferred.callback(data)
852+
deferred.callback(best_block)
831853

832854
def _setup_tx_streaming(self):
833855
""" Common setup before starting an outgoing transaction stream.

tests/p2p/test_get_best_blockchain.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
class BaseGetBestBlockchainTestCase(SimulatorTestCase):
2020

21+
seed_config = 6
22+
2123
def _send_cmd(self, proto, cmd, payload=None):
2224
if not payload:
2325
line = '{}\r\n'.format(cmd)

tests/p2p/test_sync.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,9 @@ def test_sync_metadata(self):
503503
# check they have the same consensus
504504
node_sync1 = conn.proto1.state.sync_agent
505505
node_sync2 = conn.proto2.state.sync_agent
506-
self.assertEqual(node_sync1.peer_height, height)
507-
self.assertEqual(node_sync1.synced_height, height)
508-
self.assertEqual(node_sync2.peer_height, height)
506+
self.assertEqual(node_sync1.peer_best_block.height, height)
507+
self.assertEqual(node_sync1.synced_block.height, height)
508+
self.assertEqual(node_sync2.peer_best_block.height, height)
509509
# 3 genesis + blocks + 8 txs
510510
self.assertEqual(self.manager1.tx_storage.get_vertices_count(), height + 11)
511511
self.assertEqual(manager2.tx_storage.get_vertices_count(), height + 11)
@@ -527,14 +527,14 @@ def test_tx_propagation_nat_peers(self):
527527

528528
node_sync1 = self.conn1.proto1.state.sync_agent
529529
self.assertEqual(self.manager1.tx_storage.latest_timestamp, self.manager2.tx_storage.latest_timestamp)
530-
self.assertEqual(node_sync1.peer_height, node_sync1.synced_height)
531-
self.assertEqual(node_sync1.peer_height, self.manager1.tx_storage.get_height_best_block())
530+
self.assertEqual(node_sync1.peer_best_block, node_sync1.synced_block)
531+
self.assertEqual(node_sync1.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
532532
self.assertConsensusEqual(self.manager1, self.manager2)
533533

534534
node_sync2 = self.conn2.proto1.state.sync_agent
535535
self.assertEqual(self.manager2.tx_storage.latest_timestamp, self.manager3.tx_storage.latest_timestamp)
536-
self.assertEqual(node_sync2.peer_height, node_sync2.synced_height)
537-
self.assertEqual(node_sync2.peer_height, self.manager2.tx_storage.get_height_best_block())
536+
self.assertEqual(node_sync2.peer_best_block, node_sync2.synced_block)
537+
self.assertEqual(node_sync2.peer_best_block.height, self.manager2.tx_storage.get_height_best_block())
538538
self.assertConsensusEqual(self.manager2, self.manager3)
539539

540540
def test_block_sync_new_blocks_and_txs(self):
@@ -560,8 +560,8 @@ def test_block_sync_new_blocks_and_txs(self):
560560

561561
node_sync = conn.proto1.state.sync_agent
562562
self.assertEqual(self.manager1.tx_storage.latest_timestamp, manager2.tx_storage.latest_timestamp)
563-
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
564-
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
563+
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
564+
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
565565
self.assertConsensusEqual(self.manager1, manager2)
566566
self.assertConsensusValid(self.manager1)
567567
self.assertConsensusValid(manager2)
@@ -581,8 +581,8 @@ def test_block_sync_many_new_blocks(self):
581581
self.clock.advance(1)
582582

583583
node_sync = conn.proto1.state.sync_agent
584-
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
585-
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
584+
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
585+
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
586586
self.assertConsensusEqual(self.manager1, manager2)
587587
self.assertConsensusValid(self.manager1)
588588
self.assertConsensusValid(manager2)
@@ -602,8 +602,8 @@ def test_block_sync_new_blocks(self):
602602
self.clock.advance(1)
603603

604604
node_sync = conn.proto1.state.sync_agent
605-
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
606-
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
605+
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
606+
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
607607
self.assertConsensusEqual(self.manager1, manager2)
608608
self.assertConsensusValid(self.manager1)
609609
self.assertConsensusValid(manager2)
@@ -664,9 +664,9 @@ def test_full_sync(self):
664664

665665
node_sync1 = conn.proto1.state.sync_agent
666666
node_sync2 = conn.proto2.state.sync_agent
667-
self.assertEqual(node_sync1.peer_height, common_height)
668-
self.assertEqual(node_sync1.synced_height, common_height)
669-
self.assertEqual(node_sync2.peer_height, common_height)
667+
self.assertEqual(node_sync1.peer_best_block.height, common_height)
668+
self.assertEqual(node_sync1.synced_block.height, common_height)
669+
self.assertEqual(node_sync2.peer_best_block.height, common_height)
670670
self.assertConsensusValid(self.manager1)
671671
self.assertConsensusValid(manager2)
672672
self.assertConsensusEqual(self.manager1, manager2)
@@ -715,9 +715,9 @@ def test_block_sync_checkpoints(self):
715715
node_sync1 = conn.proto1.state.sync_agent
716716
node_sync2 = conn.proto2.state.sync_agent
717717

718-
self.assertEqual(node_sync1.peer_height, TOTAL_BLOCKS)
719-
self.assertEqual(node_sync1.synced_height, TOTAL_BLOCKS)
720-
self.assertEqual(node_sync2.peer_height, len(blocks))
718+
self.assertEqual(node_sync1.peer_best_block.height, TOTAL_BLOCKS)
719+
self.assertEqual(node_sync1.synced_block.height, TOTAL_BLOCKS)
720+
self.assertEqual(node_sync2.peer_best_block.height, len(blocks))
721721
self.assertConsensusValid(self.manager1)
722722
self.assertConsensusValid(manager2)
723723

@@ -738,8 +738,8 @@ def test_block_sync_only_genesis(self):
738738
self.clock.advance(1)
739739

740740
node_sync = conn.proto1.state.sync_agent
741-
self.assertEqual(node_sync.synced_height, 0)
742-
self.assertEqual(node_sync.peer_height, 0)
741+
self.assertEqual(node_sync.synced_block.height, 0)
742+
self.assertEqual(node_sync.peer_best_block.height, 0)
743743

744744
self.assertEqual(self.manager1.tx_storage.get_vertices_count(), 3)
745745
self.assertEqual(manager2.tx_storage.get_vertices_count(), 3)

tests/p2p/test_sync_v2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
class BaseRandomSimulatorTestCase(SimulatorTestCase):
1616
__test__ = True
1717

18-
seed_config = 2
18+
seed_config = 5
1919

2020
def _get_partial_blocks(self, tx_storage):
2121
with tx_storage.allow_partially_validated_context():

tests/unittest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ def assertV1SyncedProgress(self, node_sync):
473473
self.assertEqual(node_sync.synced_timestamp, node_sync.peer_timestamp)
474474

475475
def assertV2SyncedProgress(self, node_sync):
476-
self.assertEqual(node_sync.synced_height, node_sync.peer_height)
476+
self.assertEqual(node_sync.synced_block, node_sync.peer_best_block)
477477

478478
def clean_tmpdirs(self):
479479
for tmpdir in self.tmpdirs:

0 commit comments

Comments
 (0)