Skip to content

Commit 6e41808

Browse files
committed
1 parent dc650f3 commit 6e41808

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

hathor/p2p/sync_v2/agent.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class PeerState(Enum):
5353
SYNCING_MEMPOOL = 'syncing-mempool'
5454

5555

56+
class _GetDataOrigin(Enum):
57+
MEMPOOL = 'mempool'
58+
59+
5660
class NodeBlockSync(SyncAgent):
5761
""" An algorithm to sync two peers based on their blockchain.
5862
"""
@@ -1000,33 +1004,34 @@ def handle_transaction(self, payload: str) -> None:
10001004
self.log.debug('tx streaming in progress', txs_received=self._tx_received)
10011005

10021006
@inlineCallbacks
1003-
def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]:
1007+
def get_tx_mempool(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]:
10041008
""" Async method to get a transaction from the db/cache or to download it.
10051009
"""
1010+
assert self.state is PeerState.SYNCING_MEMPOOL, 'get_tx_mempool must only be called on mempool state'
10061011
tx = self._get_tx_cache.get(tx_id)
10071012
if tx is not None:
10081013
self.log.debug('tx in cache', tx=tx_id.hex())
10091014
return tx
10101015
try:
10111016
tx = self.tx_storage.get_transaction(tx_id)
10121017
except TransactionDoesNotExist:
1013-
tx = yield self.get_data(tx_id, 'mempool')
1018+
tx = yield self.get_data(tx_id, _GetDataOrigin.MEMPOOL)
10141019
assert tx is not None
10151020
if tx.hash != tx_id:
10161021
self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} hash mismatch')
10171022
raise
10181023
return tx
10191024

1020-
def get_data(self, tx_id: bytes, origin: str) -> Deferred[BaseTransaction]:
1025+
def get_data(self, tx_id: bytes, origin: _GetDataOrigin) -> Deferred[BaseTransaction]:
10211026
""" Async method to request a tx by id.
10221027
"""
10231028
# TODO: deal with stale `get_data` calls
1024-
if origin != 'mempool':
1029+
if origin is not _GetDataOrigin.MEMPOOL:
10251030
raise ValueError(f'origin={origin} not supported, only origin=mempool is supported')
10261031
deferred = self._deferred_txs.get(tx_id, None)
10271032
if deferred is None:
10281033
deferred = self._deferred_txs[tx_id] = Deferred()
1029-
self.send_get_data(tx_id, origin=origin)
1034+
self.send_get_data(tx_id, origin=origin.name)
10301035
self.log.debug('get_data of new tx_id', deferred=deferred, key=tx_id.hex())
10311036
else:
10321037
# XXX: can we re-use deferred objects like this?

hathor/p2p/sync_v2/mempool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def _unsafe_run(self) -> Generator[Deferred, Any, None]:
7979
while self.missing_tips:
8080
self.log.debug('We have missing tips! Let\'s start!', missing_tips=[x.hex() for x in self.missing_tips])
8181
tx_id = next(iter(self.missing_tips))
82-
tx: BaseTransaction = yield self.sync_agent.get_tx(tx_id)
82+
tx: BaseTransaction = yield self.sync_agent.get_tx_mempool(tx_id)
8383
# Stack used by the DFS in the dependencies.
8484
# We use a deque for performance reasons.
8585
self.log.debug('start mempool DSF', tx=tx.hash_hex)
@@ -98,7 +98,7 @@ def _dfs(self, stack: deque[BaseTransaction]) -> Generator[Deferred, Any, None]:
9898
assert tx == stack.pop()
9999
else:
100100
self.log.debug('Iterate in the DFS.', missing_dep=missing_dep.hex())
101-
tx_dep = yield self.sync_agent.get_tx(missing_dep)
101+
tx_dep = yield self.sync_agent.get_tx_mempool(missing_dep)
102102
stack.append(tx_dep)
103103
if len(stack) > self.MAX_STACK_LENGTH:
104104
stack.popleft()

0 commit comments

Comments
 (0)