Skip to content

Commit 4b04c9c

Browse files
committed
feat(sync-v2): Wait for sync internal methods to finish before initiating next syncing cycle
1 parent a289460 commit 4b04c9c

File tree

2 files changed

+52
-25
lines changed

2 files changed

+52
-25
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
117117
self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
118118
self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None
119119

120+
# Deferreds used when we are receiving a streaming of vertices.
121+
self._deferred_blockchain_streaming: Optional[Deferred[None]] = None
122+
self._deferred_transactions_streaming: Optional[Deferred[None]] = None
123+
120124
# When syncing blocks we start streaming with all peers
121125
# so the moment I get some repeated blocks, I stop the download
122126
# because it's probably a streaming that I've just received
@@ -289,16 +293,8 @@ def run_sync(self) -> Generator[Any, Any, None]:
289293
def _run_sync(self) -> Generator[Any, Any, None]:
290294
""" Actual implementation of the sync step logic in run_sync.
291295
"""
292-
if self.receiving_stream:
293-
# If we're receiving a stream, wait for it to finish before running sync.
294-
# If we're sending a stream, do the sync to update the peer's synced block
295-
self.log.debug('receiving stream, try again later')
296-
return
297-
298-
if self.mempool_manager.is_running():
299-
# It's running a mempool sync, so we wait until it finishes
300-
self.log.debug('running mempool sync, try again later')
301-
return
296+
assert not self.receiving_stream
297+
assert not self.mempool_manager.is_running()
302298

303299
assert self.protocol.connections is not None
304300
assert self.tx_storage.indexes is not None
@@ -308,16 +304,17 @@ def _run_sync(self) -> Generator[Any, Any, None]:
308304
self.log.debug('needed tx exist, sync transactions')
309305
self.update_synced(False)
310306
# TODO: find out whether we can sync transactions from this peer to speed things up
311-
self.run_sync_transactions()
307+
yield self.run_sync_transactions()
312308
return
313309

314310
is_block_synced = yield self.run_sync_blocks()
315311
if is_block_synced:
316312
# our blocks are synced, so sync the mempool
317313
self.state = PeerState.SYNCING_MEMPOOL
318-
self.mempool_manager.run()
314+
yield self.mempool_manager.run()
319315

320-
def run_sync_transactions(self) -> None:
316+
@inlineCallbacks
317+
def run_sync_transactions(self) -> Generator[Any, Any, None]:
321318
""" Run a step of the transaction syncing phase.
322319
"""
323320
self.state = PeerState.SYNCING_TRANSACTIONS
@@ -344,7 +341,7 @@ def run_sync_transactions(self) -> None:
344341

345342
self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(),
346343
end_block_height=block_height)
347-
self.send_get_transactions_bfs(needed_txs, block.hash)
344+
yield self.start_transactions_streaming(needed_txs, block.hash)
348345

349346
def get_my_best_block(self) -> _HeightInfo:
350347
"""Return my best block info."""
@@ -410,10 +407,11 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
410407
synced_block=self.synced_block)
411408

412409
# Sync from common block
413-
self.run_block_sync(self.synced_block.id,
414-
self.synced_block.height,
415-
self.peer_best_block.id,
416-
self.peer_best_block.height)
410+
yield self.start_blockchain_streaming(self.synced_block.id,
411+
self.synced_block.height,
412+
self.peer_best_block.id,
413+
self.peer_best_block.height)
414+
417415
return False
418416

419417
def get_tips(self) -> Deferred[list[bytes]]:
@@ -513,16 +511,20 @@ def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash:
513511
self._blk_stream_reverse = reverse
514512
self._last_received_block = None
515513

516-
def run_block_sync(self, start_hash: bytes, start_height: int, end_hash: bytes, end_height: int) -> None:
517-
""" Called when the bestblock is after all checkpoints.
518-
519-
It must syncs to the left until it reaches the remote's best block or the max stream limit.
520-
"""
514+
def start_blockchain_streaming(self,
515+
start_hash: bytes,
516+
start_height: int,
517+
end_hash: bytes,
518+
end_height: int) -> Deferred[None]:
519+
"""Request peer to start streaming blocks to us."""
520+
assert self._deferred_blockchain_streaming is None
521521
self._setup_block_streaming(start_hash, start_height, end_hash, end_height, False)
522522
quantity = end_height - start_height
523523
self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity,
524524
start_hash=start_hash.hex(), end_hash=end_hash.hex())
525525
self.send_get_next_blocks(start_hash, end_hash, quantity)
526+
self._deferred_blockchain_streaming = Deferred()
527+
return self._deferred_blockchain_streaming
526528

527529
def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
528530
""" Helper to send a message.
@@ -728,6 +730,10 @@ def handle_blocks_end(self, payload: str) -> None:
728730
self.protocol.send_error_and_close_connection('Not expecting to receive BLOCKS-END message')
729731
return
730732

733+
assert self._deferred_blockchain_streaming is not None
734+
self._deferred_blockchain_streaming.callback(None)
735+
self._deferred_blockchain_streaming = None
736+
731737
self.log.debug('block streaming ended', reason=str(response_code))
732738

733739
def handle_blocks(self, payload: str) -> None:
@@ -879,6 +885,13 @@ def _setup_tx_streaming(self):
879885
self._tx_max_quantity = DEFAULT_STREAMING_LIMIT # XXX: maybe this is redundant
880886
# XXX: what else can we add for checking if everything is going well?
881887

888+
def start_transactions_streaming(self, start_from: list[bytes], until_first_block: bytes) -> Deferred[None]:
889+
"""Request peer to start streaming transactions to us."""
890+
assert self._deferred_transactions_streaming is None
891+
self.send_get_transactions_bfs(start_from, until_first_block)
892+
self._deferred_transactions_streaming = Deferred()
893+
return self._deferred_transactions_streaming
894+
882895
def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None:
883896
""" Send a GET-TRANSACTIONS-BFS message.
884897
@@ -971,6 +984,10 @@ def handle_transactions_end(self, payload: str) -> None:
971984
self.protocol.send_error_and_close_connection('Not expecting to receive TRANSACTIONS-END message')
972985
return
973986

987+
assert self._deferred_transactions_streaming is not None
988+
self._deferred_transactions_streaming.callback(None)
989+
self._deferred_transactions_streaming = None
990+
974991
self.log.debug('transaction streaming ended', reason=str(response_code))
975992

976993
def handle_transaction(self, payload: str) -> None:

hathor/p2p/sync_v2/mempool.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ def __init__(self, sync_agent: 'NodeBlockSync'):
3939
self.tx_storage = self.manager.tx_storage
4040
self.reactor = self.sync_agent.reactor
4141

42+
self._deferred: Optional[Deferred[None]] = None
43+
4244
# Set of tips we know but couldn't add to the DAG yet.
4345
self.missing_tips: set[bytes] = set()
4446

@@ -52,21 +54,29 @@ def is_running(self) -> bool:
5254
"""Whether the sync-mempool is currently running."""
5355
return self._is_running
5456

55-
def run(self) -> None:
57+
def run(self) -> Deferred[None]:
5658
"""Starts _run in, won't start again if already running."""
5759
if self.is_running():
5860
self.log.warn('already started')
59-
return
61+
assert self._deferred is not None
62+
return self._deferred
6063
self._is_running = True
6164
self.reactor.callLater(0, self._run)
6265

66+
assert self._deferred is None
67+
self._deferred = Deferred()
68+
return self._deferred
69+
6370
@inlineCallbacks
6471
def _run(self) -> Generator[Deferred, Any, None]:
6572
try:
6673
yield self._unsafe_run()
6774
finally:
6875
# sync_agent.run_sync will start it again when needed
6976
self._is_running = False
77+
assert self._deferred is not None
78+
self._deferred.callback(None)
79+
self._deferred = None
7080

7181
@inlineCallbacks
7282
def _unsafe_run(self) -> Generator[Deferred, Any, None]:

0 commit comments

Comments
 (0)