Skip to content

Commit b29481a

Browse files
committed
resume streaming if max tx limit is reached
1 parent a3658a7 commit b29481a

File tree

3 files changed

+52
-7
lines changed

3 files changed

+52
-7
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,13 +372,17 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
372372
if partial_blocks:
373373
self.state = PeerState.SYNCING_TRANSACTIONS
374374
try:
375-
yield self.start_transactions_streaming(partial_blocks)
375+
reason = yield self.start_transactions_streaming(partial_blocks)
376376
except StreamingError as e:
377377
self.log.info('tx streaming failed', reason=repr(e))
378378
self.send_stop_transactions_streaming()
379379
self.receiving_stream = False
380380
return False
381381

382+
self.log.info('tx streaming finished', reason=reason)
383+
while reason == StreamEnd.LIMIT_EXCEEDED:
384+
reason = yield self.resume_transactions_streaming()
385+
382386
self._blk_streaming_client = None
383387
self._tx_streaming_client = None
384388
return False
@@ -631,7 +635,10 @@ def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) ->
631635
self.log.debug('requested start_hash not found', start_hash=start_hash.hex())
632636
self.send_message(ProtocolMessages.NOT_FOUND, start_hash.hex())
633637
return
634-
assert isinstance(blk, Block)
638+
if not isinstance(blk, Block):
639+
self.log.debug('request start_hash is not a block', start_hash=start_hash.hex())
640+
self.send_message(ProtocolMessages.NOT_FOUND, start_hash.hex())
641+
return
635642
assert blk.hash is not None
636643
# XXX: it is not an error for the other peer to request a voided block, we'll pretend it doesn't exist, butf
637644
blk_meta = blk.get_metadata()
@@ -801,6 +808,21 @@ def start_transactions_streaming(self, partial_blocks: list[Block]) -> Deferred[
801808
self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash)
802809
return self._tx_streaming_client.wait()
803810

811+
def resume_transactions_streaming(self) -> Deferred[StreamEnd]:
812+
"""Resume transaction streaming."""
813+
idx = self._tx_streaming_client._idx
814+
partial_blocks = self._tx_streaming_client.partial_blocks[idx:]
815+
assert partial_blocks
816+
start_from = list(self._tx_streaming_client._waiting_for)
817+
first_block_hash = not_none(partial_blocks[0].hash)
818+
last_block_hash = not_none(partial_blocks[-1].hash)
819+
self.log.info('requesting transactions streaming',
820+
start_from=[x.hex() for x in start_from],
821+
first_block=first_block_hash.hex(),
822+
last_block=last_block_hash.hex())
823+
self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash)
824+
return self._tx_streaming_client.resume()
825+
804826
def send_get_transactions_bfs(self,
805827
start_from: list[bytes],
806828
first_block_hash: bytes,

hathor/p2p/sync_v2/streamers.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from enum import IntFlag
16-
from typing import TYPE_CHECKING, Iterator, Optional, cast
16+
from typing import TYPE_CHECKING, Iterable, Iterator, Optional, Union, cast
1717

1818
from structlog import get_logger
1919
from twisted.internet.interfaces import IConsumer, IDelayedCall, IPushProducer
@@ -39,6 +39,7 @@ class StreamEnd(IntFlag):
3939
LIMIT_EXCEEDED = 2
4040
STREAM_BECAME_VOIDED = 3 # this will happen when the current chain becomes voided while it is being sent
4141
TX_NOT_CONFIRMED = 4
42+
INVALID_PARAMS = 5
4243

4344
def __str__(self):
4445
if self is StreamEnd.END_HASH_REACHED:
@@ -191,6 +192,10 @@ def send_next(self) -> None:
191192

192193
class TransactionsStreamingServer(_StreamingServerBase):
193194
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
195+
196+
If the start_from parameter is not empty, the BFS (Breadth-First Search) for the first block will commence
197+
using the provided hashes. This mechanism enables streaming requests to continue from a specific point
198+
should there be interruptions or issues.
194199
"""
195200

196201
def __init__(self,
@@ -206,28 +211,39 @@ def __init__(self,
206211

207212
self.first_block: Block = cast(Block, self.tx_storage.get_transaction(first_block_hash))
208213
self.last_block: Block = cast(Block, self.tx_storage.get_transaction(last_block_hash))
214+
self.start_from = start_from
209215

210216
assert isinstance(self.first_block, Block)
211217
assert isinstance(self.last_block, Block)
212218

213-
self.current_block: Optional[Block] = self.first_block
214-
215219
# TODO Validated start_from and use it when it's not empty.
216220

221+
self.current_block: Optional[Block] = self.first_block
217222
self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False)
218223
self.iter = self.get_iter()
219224

220225
def get_iter(self) -> Iterator[BaseTransaction]:
221226
"""Return an iterator that yields all transactions confirmed by each block in sequence."""
227+
root: Union[BaseTransaction, Iterable[BaseTransaction]]
228+
skip_root: bool
222229
while self.current_block:
230+
if not self.start_from:
231+
root = self.current_block
232+
skip_root = True
233+
else:
234+
root = self.start_from
235+
skip_root = False
223236
self.log.debug('sending transactions from block',
224237
block=not_none(self.current_block.hash).hex(),
225-
height=self.current_block.get_height())
226-
it = self.bfs.run(self.current_block, skip_root=True)
238+
height=self.current_block.get_height(),
239+
start_from=self.start_from,
240+
skip_root=skip_root)
241+
it = self.bfs.run(root, skip_root=skip_root)
227242
yield from it
228243
if self.current_block == self.last_block:
229244
break
230245
self.current_block = self.current_block.get_next_block_best_chain()
246+
self.start_from.clear()
231247

232248
def send_next(self) -> None:
233249
"""Push next transaction to peer."""

hathor/p2p/sync_v2/transaction_streaming_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ def wait(self) -> Deferred[StreamEnd]:
6565
"""Return the deferred."""
6666
return self._deferred
6767

68+
def resume(self) -> Deferred[StreamEnd]:
69+
"""Resume receiving vertices."""
70+
assert self._deferred.called
71+
self._deferred = Deferred()
72+
return self._deferred
73+
6874
def fails(self, reason: 'StreamingError') -> None:
6975
"""Fail the execution by resolving the deferred with an error."""
7076
self._deferred.errback(reason)
@@ -135,6 +141,7 @@ def handle_transactions_end(self, response_code: StreamEnd) -> None:
135141
"""This method is called by the sync agent when a TRANSACTIONS-END message is received."""
136142
if self._deferred.called:
137143
return
144+
self.log.info('transactions streaming ended', waiting_for=len(self._waiting_for))
138145
self._deferred.callback(response_code)
139146

140147
def _execute_and_prepare_next(self) -> None:

0 commit comments

Comments
 (0)