Skip to content

Commit f65fdb4

Browse files
committed
fix(sync-v2): stop _process_transaction on error
1 parent 2cc841b commit f65fdb4

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

hathor/p2p/sync_v2/transaction_streaming_client.py

+6
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ def resume(self) -> Deferred[StreamEnd]:
100100

101101
def fails(self, reason: 'StreamingError') -> None:
102102
"""Fail the execution by resolving the deferred with an error."""
103+
if self._deferred.called:
104+
self.log.warn('already failed before', new_reason=repr(reason))
105+
return
103106
self._deferred.errback(reason)
104107

105108
def handle_transaction(self, tx: BaseTransaction) -> None:
@@ -126,6 +129,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None:
126129
@inlineCallbacks
127130
def process_queue(self) -> Generator[Any, Any, None]:
128131
"""Process next transaction in the queue."""
132+
if self._deferred.called:
133+
return
134+
129135
if self._is_processing:
130136
return
131137

tests/p2p/test_sync_v2.py

+58-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
1+
import base64
2+
import re
3+
14
import pytest
25
from twisted.internet.defer import inlineCallbacks, succeed
36
from twisted.python.failure import Failure
47

58
from hathor.conf import HathorSettings
9+
from hathor.p2p.messages import ProtocolMessages
610
from hathor.p2p.peer_id import PeerId
711
from hathor.p2p.sync_v2.agent import _HeightInfo
812
from hathor.simulator import FakeConnection
9-
from hathor.simulator.trigger import StopAfterNMinedBlocks, StopAfterNTransactions, StopWhenTrue, Trigger
13+
from hathor.simulator.trigger import (
14+
StopAfterNMinedBlocks,
15+
StopAfterNTransactions,
16+
StopWhenSendLineMatch,
17+
StopWhenTrue,
18+
Trigger,
19+
)
1020
from hathor.transaction.storage.traversal import DFSWalk
1121
from tests.simulation.base import SimulatorTestCase
1222
from tests.utils import HAS_ROCKSDB
@@ -321,3 +331,50 @@ def fake_get_peer_block_hashes(heights):
321331
sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes
322332
common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block)
323333
self.assertIsNone(common_block_info)
334+
335+
def test_multiple_unexpected_txs(self) -> None:
336+
manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True)
337+
manager1.allow_mining_without_peers()
338+
339+
# mine some blocks (10, could be any amount)
340+
miner1 = self.simulator.create_miner(manager1, hashpower=10e6)
341+
miner1.start()
342+
self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNMinedBlocks(miner1, quantity=100)))
343+
miner1.stop()
344+
345+
# generate some transactions (10, could by any amount >1)
346+
gen_tx1 = self.simulator.create_tx_generator(manager1, rate=3., hashpower=10e9, ignore_no_funds=True)
347+
gen_tx1.start()
348+
self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNTransactions(gen_tx1, quantity=10)))
349+
gen_tx1.stop()
350+
351+
# mine some blocks (2 to be sure, 1 should be enough)
352+
miner1.start()
353+
self.assertTrue(self.simulator.run(3 * 3600, trigger=StopAfterNMinedBlocks(miner1, quantity=2)))
354+
miner1.stop()
355+
356+
# create a new peer and run sync and stop when it requests transactions, so we can inject it with invalid ones
357+
manager2 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True)
358+
conn12 = FakeConnection(manager1, manager2, latency=0.05)
359+
self.simulator.add_connection(conn12)
360+
regex = re.compile(rf'{ProtocolMessages.GET_TRANSACTIONS_BFS.value} '.encode('ascii'))
361+
self.assertTrue(self.simulator.run(2 * 60, trigger=StopWhenSendLineMatch(conn12._proto2, regex)))
362+
363+
# make up some transactions that the node isn't expecting
364+
best_block = manager1.tx_storage.get_best_block()
365+
existing_tx = manager1.tx_storage.get_transaction(list(best_block.get_tx_parents())[0])
366+
fake_txs = []
367+
for i in range(3):
368+
fake_tx = existing_tx.clone()
369+
fake_tx.timestamp += 1 + i # incrementally add timestamp so something is guaranteed to change
370+
manager1.cpu_mining_service.resolve(fake_tx)
371+
fake_txs.append(fake_tx)
372+
373+
# send fake transactions to manager2, before the fix the first should fail with no issue, but the second would
374+
# end up on an AlreadyCalledError because the deferred.errback will be called twice
375+
for fake_tx in fake_txs:
376+
sync_node2 = conn12.proto2.state.sync_agent
377+
sync_node2.handle_transaction(base64.b64encode(fake_tx.get_struct()).decode())
378+
379+
# force the processing of async code, nothing should break
380+
self.simulator.run(0)

0 commit comments

Comments
 (0)