@@ -117,6 +117,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
117
117
self ._deferred_best_block : Optional [Deferred [_HeightInfo ]] = None
118
118
self ._deferred_peer_block_hashes : Optional [Deferred [list [_HeightInfo ]]] = None
119
119
120
+ # Deferreds used when we are receiving a streaming of vertices.
121
+ self ._deferred_blockchain_streaming : Optional [Deferred [None ]] = None
122
+ self ._deferred_transactions_streaming : Optional [Deferred [None ]] = None
123
+
120
124
# When syncing blocks we start streaming with all peers
121
125
# so the moment I get some repeated blocks, I stop the download
122
126
# because it's probably a streaming that I've just received
@@ -289,16 +293,8 @@ def run_sync(self) -> Generator[Any, Any, None]:
289
293
def _run_sync (self ) -> Generator [Any , Any , None ]:
290
294
""" Actual implementation of the sync step logic in run_sync.
291
295
"""
292
- if self .receiving_stream :
293
- # If we're receiving a stream, wait for it to finish before running sync.
294
- # If we're sending a stream, do the sync to update the peer's synced block
295
- self .log .debug ('receiving stream, try again later' )
296
- return
297
-
298
- if self .mempool_manager .is_running ():
299
- # It's running a mempool sync, so we wait until it finishes
300
- self .log .debug ('running mempool sync, try again later' )
301
- return
296
+ assert not self .receiving_stream
297
+ assert not self .mempool_manager .is_running ()
302
298
303
299
assert self .protocol .connections is not None
304
300
assert self .tx_storage .indexes is not None
@@ -308,16 +304,17 @@ def _run_sync(self) -> Generator[Any, Any, None]:
308
304
self .log .debug ('needed tx exist, sync transactions' )
309
305
self .update_synced (False )
310
306
# TODO: find out whether we can sync transactions from this peer to speed things up
311
- self .run_sync_transactions ()
307
+ yield self .run_sync_transactions ()
312
308
return
313
309
314
310
is_block_synced = yield self .run_sync_blocks ()
315
311
if is_block_synced :
316
312
# our blocks are synced, so sync the mempool
317
313
self .state = PeerState .SYNCING_MEMPOOL
318
- self .mempool_manager .run ()
314
+ yield self .mempool_manager .run ()
319
315
320
- def run_sync_transactions (self ) -> None :
316
+ @inlineCallbacks
317
+ def run_sync_transactions (self ) -> Generator [Any , Any , None ]:
321
318
""" Run a step of the transaction syncing phase.
322
319
"""
323
320
self .state = PeerState .SYNCING_TRANSACTIONS
@@ -344,7 +341,7 @@ def run_sync_transactions(self) -> None:
344
341
345
342
self .log .info ('run sync transactions' , start = [i .hex () for i in needed_txs ], end_block_hash = block .hash .hex (),
346
343
end_block_height = block_height )
347
- self .send_get_transactions_bfs (needed_txs , block .hash )
344
+ yield self .start_transactions_streaming (needed_txs , block .hash )
348
345
349
346
def get_my_best_block (self ) -> _HeightInfo :
350
347
"""Return my best block info."""
@@ -410,10 +407,11 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
410
407
synced_block = self .synced_block )
411
408
412
409
# Sync from common block
413
- self .run_block_sync (self .synced_block .id ,
414
- self .synced_block .height ,
415
- self .peer_best_block .id ,
416
- self .peer_best_block .height )
410
+ yield self .start_blockchain_streaming (self .synced_block .id ,
411
+ self .synced_block .height ,
412
+ self .peer_best_block .id ,
413
+ self .peer_best_block .height )
414
+
417
415
return False
418
416
419
417
def get_tips (self ) -> Deferred [list [bytes ]]:
@@ -513,16 +511,20 @@ def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash:
513
511
self ._blk_stream_reverse = reverse
514
512
self ._last_received_block = None
515
513
516
- def run_block_sync (self , start_hash : bytes , start_height : int , end_hash : bytes , end_height : int ) -> None :
517
- """ Called when the bestblock is after all checkpoints.
518
-
519
- It must syncs to the left until it reaches the remote's best block or the max stream limit.
520
- """
514
+ def start_blockchain_streaming (self ,
515
+ start_hash : bytes ,
516
+ start_height : int ,
517
+ end_hash : bytes ,
518
+ end_height : int ) -> Deferred [None ]:
519
+ """Request peer to start streaming blocks to us."""
520
+ assert self ._deferred_blockchain_streaming is None
521
521
self ._setup_block_streaming (start_hash , start_height , end_hash , end_height , False )
522
522
quantity = end_height - start_height
523
523
self .log .info ('get next blocks' , start_height = start_height , end_height = end_height , quantity = quantity ,
524
524
start_hash = start_hash .hex (), end_hash = end_hash .hex ())
525
525
self .send_get_next_blocks (start_hash , end_hash , quantity )
526
+ self ._deferred_blockchain_streaming = Deferred ()
527
+ return self ._deferred_blockchain_streaming
526
528
527
529
def send_message (self , cmd : ProtocolMessages , payload : Optional [str ] = None ) -> None :
528
530
""" Helper to send a message.
@@ -728,6 +730,10 @@ def handle_blocks_end(self, payload: str) -> None:
728
730
self .protocol .send_error_and_close_connection ('Not expecting to receive BLOCKS-END message' )
729
731
return
730
732
733
+ assert self ._deferred_blockchain_streaming is not None
734
+ self ._deferred_blockchain_streaming .callback (None )
735
+ self ._deferred_blockchain_streaming = None
736
+
731
737
self .log .debug ('block streaming ended' , reason = str (response_code ))
732
738
733
739
def handle_blocks (self , payload : str ) -> None :
@@ -879,6 +885,13 @@ def _setup_tx_streaming(self):
879
885
self ._tx_max_quantity = DEFAULT_STREAMING_LIMIT # XXX: maybe this is redundant
880
886
# XXX: what else can we add for checking if everything is going well?
881
887
888
+ def start_transactions_streaming (self , start_from : list [bytes ], until_first_block : bytes ) -> Deferred [None ]:
889
+ """Request peer to start streaming transactions to us."""
890
+ assert self ._deferred_transactions_streaming is None
891
+ self .send_get_transactions_bfs (start_from , until_first_block )
892
+ self ._deferred_transactions_streaming = Deferred ()
893
+ return self ._deferred_transactions_streaming
894
+
882
895
def send_get_transactions_bfs (self , start_from : list [bytes ], until_first_block : bytes ) -> None :
883
896
""" Send a GET-TRANSACTIONS-BFS message.
884
897
@@ -971,6 +984,10 @@ def handle_transactions_end(self, payload: str) -> None:
971
984
self .protocol .send_error_and_close_connection ('Not expecting to receive TRANSACTIONS-END message' )
972
985
return
973
986
987
+ assert self ._deferred_transactions_streaming is not None
988
+ self ._deferred_transactions_streaming .callback (None )
989
+ self ._deferred_transactions_streaming = None
990
+
974
991
self .log .debug ('transaction streaming ended' , reason = str (response_code ))
975
992
976
993
def handle_transaction (self , payload : str ) -> None :
0 commit comments