Skip to content

Commit 36692d0

Browse files
committed
refactor(vertex-handler): remove p2p_manager dependency
1 parent d569b9a commit 36692d0

File tree

10 files changed

+39
-45
lines changed

10 files changed

+39
-45
lines changed

hathor/builder/builder.py

-1
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler:
614614
tx_storage=self._get_or_create_tx_storage(),
615615
verification_service=self._get_or_create_verification_service(),
616616
consensus=self._get_or_create_consensus(),
617-
p2p_manager=self._get_or_create_p2p_manager(),
618617
feature_service=self._get_or_create_feature_service(),
619618
pubsub=self._get_or_create_pubsub(),
620619
wallet=self._get_or_create_wallet(),

hathor/builder/cli_builder.py

-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
334334
tx_storage=tx_storage,
335335
verification_service=verification_service,
336336
consensus=consensus_algorithm,
337-
p2p_manager=p2p_manager,
338337
feature_service=self.feature_service,
339338
pubsub=pubsub,
340339
wallet=self.wallet,

hathor/manager.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
from hathor.p2p.manager import ConnectionsManager
4949
from hathor.p2p.peer import PrivatePeer
5050
from hathor.p2p.peer_id import PeerId
51-
from hathor.profiler import get_cpu_profiler
5251
from hathor.pubsub import HathorEvents, PubSubManager
5352
from hathor.reactor import ReactorProtocol as Reactor
5453
from hathor.reward_lock import is_spent_reward_locked
@@ -69,7 +68,6 @@
6968
from hathor.websocket.factory import HathorAdminWebsocketFactory
7069

7170
logger = get_logger()
72-
cpu = get_cpu_profiler()
7371

7472

7573
class HathorManager:
@@ -171,8 +169,6 @@ def __init__(
171169

172170
self.is_started: bool = False
173171

174-
self.cpu = cpu
175-
176172
# XXX: first checkpoint must be genesis (height=0)
177173
self.checkpoints: list[Checkpoint] = checkpoints or []
178174
self.checkpoints_ready: list[bool] = [False] * len(self.checkpoints)
@@ -960,7 +956,6 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool
960956

961957
return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True)
962958

963-
@cpu.profiler('on_new_tx')
964959
def on_new_tx(
965960
self,
966961
tx: BaseTransaction,
@@ -977,14 +972,18 @@ def on_new_tx(
977972
:param fails_silently: if False will raise an exception when tx cannot be added
978973
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
979974
"""
980-
return self.vertex_handler.on_new_vertex(
975+
result = self.vertex_handler.on_new_vertex(
981976
tx,
982977
quiet=quiet,
983978
fails_silently=fails_silently,
984-
propagate_to_peers=propagate_to_peers,
985979
reject_locked_reward=reject_locked_reward,
986980
)
987981

982+
if propagate_to_peers and result:
983+
self.connections.send_tx_to_peers(tx)
984+
985+
return result
986+
988987
def has_sync_version_capability(self) -> bool:
989988
return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities
990989

hathor/p2p/sync_v1/agent.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,9 @@ def handle_data(self, payload: str) -> None:
636636
self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
637637
# If we have not requested the data, it is a new transaction being propagated
638638
# in the network, thus, we propagate it as well.
639-
result = self.manager.on_new_tx(tx, propagate_to_peers=True)
639+
result = self.manager.vertex_handler.on_new_vertex(tx)
640+
if result:
641+
self.protocol.connections.send_tx_to_peers(tx)
640642
self.update_received_stats(tx, result)
641643

642644
def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None:
@@ -685,7 +687,9 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction':
685687
success = True
686688
else:
687689
# Add tx to the DAG.
688-
success = self.manager.on_new_tx(tx)
690+
success = self.manager.vertex_handler.on_new_vertex(tx)
691+
if success:
692+
self.protocol.connections.send_tx_to_peers(tx)
689693
# Updating stats data
690694
self.update_received_stats(tx, success)
691695
return tx

hathor/p2p/sync_v2/agent.py

+9-11
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,9 @@ def handle_tips(self, payload: str) -> None:
488488
data = [bytes.fromhex(x) for x in data]
489489
# filter-out txs we already have
490490
try:
491-
self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id))
491+
self._receiving_tips.extend(
492+
VertexId(tx_id) for tx_id in data if not self.tx_storage.partial_vertex_exists(tx_id)
493+
)
492494
except ValueError:
493495
self.protocol.send_error_and_close_connection('Invalid trasaction ID received')
494496
# XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol
@@ -553,12 +555,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) ->
553555
assert self.protocol.state is not None
554556
self.protocol.state.send_message(cmd, payload)
555557

556-
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
557-
""" Return true if the vertex exists no matter its validation state.
558-
"""
559-
with self.tx_storage.allow_partially_validated_context():
560-
return self.tx_storage.transaction_exists(vertex_id)
561-
562558
@inlineCallbacks
563559
def find_best_common_block(self,
564560
my_best_block: _HeightInfo,
@@ -621,11 +617,11 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G
621617
try:
622618
for tx in vertex_list:
623619
if not self.tx_storage.transaction_exists(tx.hash):
624-
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=False, fails_silently=False)
620+
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
625621
yield deferLater(self.reactor, 0, lambda: None)
626622

627623
if not self.tx_storage.transaction_exists(blk.hash):
628-
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
624+
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
629625
except InvalidNewTransaction:
630626
self.protocol.send_error_and_close_connection('invalid vertex received')
631627

@@ -1163,7 +1159,7 @@ def handle_data(self, payload: str) -> None:
11631159

11641160
tx.storage = self.protocol.node.tx_storage
11651161

1166-
if self.partial_vertex_exists(tx.hash):
1162+
if self.tx_storage.partial_vertex_exists(tx.hash):
11671163
# transaction already added to the storage, ignore it
11681164
# XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs
11691165
self.tx_storage.compare_bytes_with_local_tx(tx)
@@ -1174,7 +1170,9 @@ def handle_data(self, payload: str) -> None:
11741170
if self.tx_storage.can_validate_full(tx):
11751171
self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
11761172
try:
1177-
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=True, fails_silently=False)
1173+
result = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
1174+
if result:
1175+
self.protocol.connections.send_tx_to_peers(tx)
11781176
except InvalidNewTransaction:
11791177
self.protocol.send_error_and_close_connection('invalid vertex received')
11801178
else:

hathor/p2p/sync_v2/blockchain_streaming_client.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from hathor.p2p.sync_v2.streamers import StreamEnd
2828
from hathor.transaction import Block
2929
from hathor.transaction.exceptions import HathorError
30-
from hathor.types import VertexId
3130

3231
if TYPE_CHECKING:
3332
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
@@ -75,11 +74,6 @@ def fails(self, reason: 'StreamingError') -> None:
7574
"""Fail the execution by resolving the deferred with an error."""
7675
self._deferred.errback(reason)
7776

78-
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
79-
"""Return true if the vertex exists no matter its validation state."""
80-
with self.tx_storage.allow_partially_validated_context():
81-
return self.tx_storage.transaction_exists(vertex_id)
82-
8377
def handle_blocks(self, blk: Block) -> None:
8478
"""This method is called by the sync agent when a BLOCKS message is received."""
8579
if self._deferred.called:
@@ -105,7 +99,7 @@ def handle_blocks(self, blk: Block) -> None:
10599

106100
# Check for repeated blocks.
107101
is_duplicated = False
108-
if self.partial_vertex_exists(blk.hash):
102+
if self.tx_storage.partial_vertex_exists(blk.hash):
109103
# We reached a block we already have. Skip it.
110104
self._blk_repeated += 1
111105
is_duplicated = True
@@ -132,7 +126,7 @@ def handle_blocks(self, blk: Block) -> None:
132126

133127
if self.tx_storage.can_validate_full(blk):
134128
try:
135-
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
129+
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
136130
except HathorError:
137131
self.fails(InvalidVertexError(blk.hash.hex()))
138132
return

hathor/p2p/sync_v2/mempool.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ def _add_tx(self, tx: BaseTransaction) -> None:
140140
if self.tx_storage.transaction_exists(tx.hash):
141141
return
142142
try:
143-
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
143+
result = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
144+
if result:
145+
self.sync_agent.protocol.connections.send_tx_to_peers(tx)
144146
except InvalidNewTransaction:
145147
self.sync_agent.protocol.send_error_and_close_connection('invalid vertex received')
146148
raise

hathor/profiler/cpu.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
import time
1616
from collections import defaultdict
1717
from functools import wraps
18-
from typing import Any, Callable, Union
18+
from typing import Callable, ParamSpec, TypeVar, Union
1919

2020
from twisted.internet.task import LoopingCall
2121

2222
Key = tuple[str, ...]
2323

24+
T = TypeVar('T')
25+
P = ParamSpec('P')
26+
2427

2528
class ProcItem:
2629
"""Store information for each process."""
@@ -184,7 +187,7 @@ def update(self) -> None:
184187
t1 = time.process_time()
185188
self.measures[('profiler',)].add_time(t1 - t0)
186189

187-
def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[..., Any]], Any]:
190+
def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[P, T]], Callable[P, T]]:
188191
"""Decorator to collect data. The `key` must be the key itself
189192
or a method that returns the key.
190193

hathor/transaction/storage/transaction_storage.py

+5
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,11 @@ def can_validate_full(self, vertex: Vertex) -> bool:
11541154
return True
11551155
return all_exist and all_valid
11561156

1157+
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
1158+
"""Return true if the vertex exists no matter its validation state."""
1159+
with self.allow_partially_validated_context():
1160+
return self.transaction_exists(vertex_id)
1161+
11571162

11581163
class BaseTransactionStorage(TransactionStorage):
11591164
indexes: Optional[IndexesManager]

hathor/vertex_handler/vertex_handler.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from hathor.consensus import ConsensusAlgorithm
2121
from hathor.exception import HathorError, InvalidNewTransaction
2222
from hathor.feature_activation.feature_service import FeatureService
23-
from hathor.p2p.manager import ConnectionsManager
23+
from hathor.profiler import get_cpu_profiler
2424
from hathor.pubsub import HathorEvents, PubSubManager
2525
from hathor.reactor import ReactorProtocol
2626
from hathor.transaction import BaseTransaction, Block
@@ -30,6 +30,7 @@
3030
from hathor.wallet import BaseWallet
3131

3232
logger = get_logger()
33+
cpu = get_cpu_profiler()
3334

3435

3536
class VertexHandler:
@@ -40,7 +41,6 @@ class VertexHandler:
4041
'_tx_storage',
4142
'_verification_service',
4243
'_consensus',
43-
'_p2p_manager',
4444
'_feature_service',
4545
'_pubsub',
4646
'_wallet',
@@ -55,7 +55,6 @@ def __init__(
5555
tx_storage: TransactionStorage,
5656
verification_service: VerificationService,
5757
consensus: ConsensusAlgorithm,
58-
p2p_manager: ConnectionsManager,
5958
feature_service: FeatureService,
6059
pubsub: PubSubManager,
6160
wallet: BaseWallet | None,
@@ -67,27 +66,25 @@ def __init__(
6766
self._tx_storage = tx_storage
6867
self._verification_service = verification_service
6968
self._consensus = consensus
70-
self._p2p_manager = p2p_manager
7169
self._feature_service = feature_service
7270
self._pubsub = pubsub
7371
self._wallet = wallet
7472
self._log_vertex_bytes = log_vertex_bytes
7573

74+
@cpu.profiler('on_new_vertex')
7675
def on_new_vertex(
7776
self,
7877
vertex: BaseTransaction,
7978
*,
8079
quiet: bool = False,
8180
fails_silently: bool = True,
82-
propagate_to_peers: bool = True,
8381
reject_locked_reward: bool = True,
8482
) -> bool:
8583
""" New method for adding transactions or blocks that steps the validation state machine.
8684
8785
:param vertex: transaction to be added
8886
:param quiet: if True will not log when a new tx is accepted
8987
:param fails_silently: if False will raise an exception when tx cannot be added
90-
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
9188
"""
9289
is_valid = self._validate_vertex(
9390
vertex,
@@ -102,7 +99,6 @@ def on_new_vertex(
10299
self._post_consensus(
103100
vertex,
104101
quiet=quiet,
105-
propagate_to_peers=propagate_to_peers,
106102
reject_locked_reward=reject_locked_reward
107103
)
108104

@@ -177,7 +173,6 @@ def _post_consensus(
177173
vertex: BaseTransaction,
178174
*,
179175
quiet: bool,
180-
propagate_to_peers: bool,
181176
reject_locked_reward: bool,
182177
) -> None:
183178
""" Handle operations that need to happen once the tx becomes fully validated.
@@ -208,10 +203,6 @@ def _post_consensus(
208203

209204
self._log_new_object(vertex, 'new {}', quiet=quiet)
210205

211-
if propagate_to_peers:
212-
# Propagate to our peers.
213-
self._p2p_manager.send_tx_to_peers(vertex)
214-
215206
def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
216207
""" A shortcut for logging additional information for block/txs.
217208
"""

0 commit comments

Comments
 (0)