Skip to content

Commit 2730df9

Browse files
committed
review changes
1 parent 93c2fb9 commit 2730df9

File tree

3 files changed

+29
-17
lines changed

3 files changed

+29
-17
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ def stop(self) -> None:
211211
if not self._started:
212212
raise Exception('NodeSyncBlock is already stopped')
213213
self._started = False
214-
self._lc_run.stop()
214+
if self._lc_run.running:
215+
self._lc_run.stop()
215216

216217
def get_cmd_dict(self) -> dict[ProtocolMessages, Callable[[str], None]]:
217218
""" Return a dict of messages of the plugin.
@@ -277,6 +278,9 @@ def run_sync(self) -> Generator[Any, Any, None]:
277278
self._is_running = True
278279
try:
279280
yield self._run_sync()
281+
except Exception:
282+
self.protocol.send_error_and_close_connection('internal error')
283+
self.log.error('unhandled exception', exc_info=True)
280284
finally:
281285
self._is_running = False
282286

@@ -796,7 +800,9 @@ def handle_best_block(self, payload: str) -> None:
796800

797801
def start_transactions_streaming(self, partial_blocks: list[Block]) -> Deferred[StreamEnd]:
798802
"""Request peer to start streaming transactions to us."""
799-
self._tx_streaming_client = TransactionStreamingClient(self, partial_blocks)
803+
self._tx_streaming_client = TransactionStreamingClient(self,
804+
partial_blocks,
805+
limit=self.DEFAULT_STREAMING_LIMIT)
800806

801807
start_from: list[bytes] = []
802808
first_block_hash = not_none(partial_blocks[0].hash)

hathor/p2p/sync_v2/transaction_streaming_client.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,8 @@
1717
from structlog import get_logger
1818
from twisted.internet.defer import Deferred
1919

20-
from hathor.p2p.sync_v2.exception import (
21-
InvalidVertexError,
22-
StreamingError,
23-
TooManyVerticesReceivedError,
24-
UnexpectedVertex,
25-
)
26-
from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, StreamEnd
20+
from hathor.p2p.sync_v2.exception import InvalidVertexError, StreamingError, TooManyVerticesReceivedError
21+
from hathor.p2p.sync_v2.streamers import StreamEnd
2722
from hathor.transaction import BaseTransaction
2823
from hathor.transaction.exceptions import HathorError, TxValidationError
2924
from hathor.types import VertexId
@@ -38,7 +33,9 @@
3833
class TransactionStreamingClient:
3934
def __init__(self,
4035
sync_agent: 'NodeBlockSync',
41-
partial_blocks: list['Block']) -> None:
36+
partial_blocks: list['Block'],
37+
*,
38+
limit: int) -> None:
4239
self.sync_agent = sync_agent
4340
self.protocol = self.sync_agent.protocol
4441
self.tx_storage = self.sync_agent.tx_storage
@@ -52,7 +49,7 @@ def __init__(self,
5249

5350
self._tx_received: int = 0
5451

55-
self._tx_max_quantity = DEFAULT_STREAMING_LIMIT
52+
self._tx_max_quantity = limit
5653

5754
self._idx: int = 0
5855
self._buffer: list[VertexId] = []
@@ -68,6 +65,7 @@ def wait(self) -> Deferred[StreamEnd]:
6865
def resume(self) -> Deferred[StreamEnd]:
6966
"""Resume receiving vertices."""
7067
assert self._deferred.called
68+
self._tx_received = 0
7169
self._deferred = Deferred()
7270
return self._deferred
7371

@@ -95,15 +93,21 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
9593
# Run basic verification.
9694
if not tx.is_genesis:
9795
try:
98-
self.manager.verification_service.validate_basic(tx)
96+
self.manager.verification_service.verify_basic(tx)
9997
except TxValidationError as e:
10098
self.fails(InvalidVertexError(repr(e)))
10199
return
102100

103101
# Any repeated transaction will fail this check because they will
104102
# not belong to the waiting list.
105103
if tx.hash not in self._waiting_for:
106-
self.fails(UnexpectedVertex(tx.hash.hex()))
104+
if tx.hash in self._db:
105+
# This case might happen during a resume, so we just log and keep syncing.
106+
self.log.info('duplicated vertex received', tx_id=tx.hash.hex())
107+
else:
108+
# TODO Uncomment the following code to fail on receiving unexpected vertices.
109+
# self.fails(UnexpectedVertex(tx.hash.hex()))
110+
self.log.info('unexpected vertex received', tx_id=tx.hash.hex())
107111
return
108112
self._waiting_for.remove(tx.hash)
109113

tests/p2p/test_sync_v2.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,17 +219,19 @@ def test_exceeds_streaming_and_mempool_limits(self) -> None:
219219
# Let the connection start to sync.
220220
self.simulator.run(1)
221221

222+
new_streaming_limit = 30
223+
222224
# Change manager1 default streaming and mempool limits.
223225
sync1 = conn12.proto1.state.sync_agent
224-
sync1.DEFAULT_STREAMING_LIMIT = 30
225-
sync1.mempool_manager.MAX_STACK_LENGTH = 30
226+
sync1.DEFAULT_STREAMING_LIMIT = new_streaming_limit
227+
sync1.mempool_manager.MAX_STACK_LENGTH = new_streaming_limit
226228
self.assertIsNone(sync1._blk_streaming_server)
227229
self.assertIsNone(sync1._tx_streaming_server)
228230

229231
# Change manager2 default streaming and mempool limits.
230232
sync2 = conn12.proto2.state.sync_agent
231-
sync2.DEFAULT_STREAMING_LIMIT = 50
232-
sync2.mempool_manager.MAX_STACK_LENGTH = 50
233+
sync2.DEFAULT_STREAMING_LIMIT = new_streaming_limit
234+
sync2.mempool_manager.MAX_STACK_LENGTH = new_streaming_limit
233235
self.assertIsNone(sync2._blk_streaming_server)
234236
self.assertIsNone(sync2._tx_streaming_server)
235237

0 commit comments

Comments
 (0)