Skip to content

refactor(vertex-handler): remove p2p_manager dependency [part 1/8] #1154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler:
tx_storage=self._get_or_create_tx_storage(),
verification_service=self._get_or_create_verification_service(),
consensus=self._get_or_create_consensus(),
p2p_manager=self._get_or_create_p2p_manager(),
feature_service=self._get_or_create_feature_service(),
pubsub=self._get_or_create_pubsub(),
wallet=self._get_or_create_wallet(),
Expand Down
1 change: 0 additions & 1 deletion hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
tx_storage=tx_storage,
verification_service=verification_service,
consensus=consensus_algorithm,
p2p_manager=p2p_manager,
feature_service=self.feature_service,
pubsub=pubsub,
wallet=self.wallet,
Expand Down
13 changes: 6 additions & 7 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_id import PeerId
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.reward_lock import is_spent_reward_locked
Expand All @@ -70,7 +69,6 @@
from hathor.websocket.factory import HathorAdminWebsocketFactory

logger = get_logger()
cpu = get_cpu_profiler()


class HathorManager:
Expand Down Expand Up @@ -172,8 +170,6 @@ def __init__(

self.is_started: bool = False

self.cpu = cpu

# XXX: first checkpoint must be genesis (height=0)
self.checkpoints: list[Checkpoint] = checkpoints or []
self.checkpoints_ready: list[bool] = [False] * len(self.checkpoints)
Expand Down Expand Up @@ -962,7 +958,6 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool

return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True)

@cpu.profiler('on_new_tx')
def on_new_tx(
self,
tx: BaseTransaction,
Expand All @@ -979,14 +974,18 @@ def on_new_tx(
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
return self.vertex_handler.on_new_vertex(
success = self.vertex_handler.on_new_vertex(
tx,
quiet=quiet,
fails_silently=fails_silently,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward,
)

if propagate_to_peers and success:
self.connections.send_tx_to_peers(tx)

return success

def has_sync_version_capability(self) -> bool:
return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities

Expand Down
10 changes: 7 additions & 3 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,10 @@ def handle_data(self, payload: str) -> None:
self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
# If we have not requested the data, it is a new transaction being propagated
# in the network, thus, we propagate it as well.
result = self.manager.on_new_tx(tx, propagate_to_peers=True)
self.update_received_stats(tx, result)
success = self.manager.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.update_received_stats(tx, success)

def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None:
""" Update protocol metrics when receiving a new tx
Expand Down Expand Up @@ -685,7 +687,9 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction':
success = True
else:
# Add tx to the DAG.
success = self.manager.on_new_tx(tx)
success = self.manager.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
# Updating stats data
self.update_received_stats(tx, success)
return tx
Expand Down
20 changes: 9 additions & 11 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,9 @@ def handle_tips(self, payload: str) -> None:
data = [bytes.fromhex(x) for x in data]
# filter-out txs we already have
try:
self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id))
self._receiving_tips.extend(
VertexId(tx_id) for tx_id in data if not self.tx_storage.partial_vertex_exists(tx_id)
)
except ValueError:
self.protocol.send_error_and_close_connection('Invalid trasaction ID received')
# XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol
Expand Down Expand Up @@ -553,12 +555,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) ->
assert self.protocol.state is not None
self.protocol.state.send_message(cmd, payload)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
""" Return true if the vertex exists no matter its validation state.
"""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

@inlineCallbacks
def find_best_common_block(self,
my_best_block: _HeightInfo,
Expand Down Expand Up @@ -621,11 +617,11 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G
try:
for tx in vertex_list:
if not self.tx_storage.transaction_exists(tx.hash):
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

if not self.tx_storage.transaction_exists(blk.hash):
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')

Expand Down Expand Up @@ -1163,7 +1159,7 @@ def handle_data(self, payload: str) -> None:

tx.storage = self.protocol.node.tx_storage

if self.partial_vertex_exists(tx.hash):
if self.tx_storage.partial_vertex_exists(tx.hash):
# transaction already added to the storage, ignore it
# XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs
self.tx_storage.compare_bytes_with_local_tx(tx)
Expand All @@ -1174,7 +1170,9 @@ def handle_data(self, payload: str) -> None:
if self.tx_storage.can_validate_full(tx):
self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
try:
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=True, fails_silently=False)
success = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
if success:
self.protocol.connections.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')
else:
Expand Down
10 changes: 2 additions & 8 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from hathor.p2p.sync_v2.streamers import StreamEnd
from hathor.transaction import Block
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
Expand Down Expand Up @@ -75,11 +74,6 @@ def fails(self, reason: 'StreamingError') -> None:
"""Fail the execution by resolving the deferred with an error."""
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_blocks(self, blk: Block) -> None:
"""This method is called by the sync agent when a BLOCKS message is received."""
if self._deferred.called:
Expand All @@ -105,7 +99,7 @@ def handle_blocks(self, blk: Block) -> None:

# Check for repeated blocks.
is_duplicated = False
if self.partial_vertex_exists(blk.hash):
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
Expand All @@ -132,7 +126,7 @@ def handle_blocks(self, blk: Block) -> None:

if self.tx_storage.can_validate_full(blk):
try:
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
except HathorError:
self.fails(InvalidVertexError(blk.hash.hex()))
return
Expand Down
4 changes: 3 additions & 1 deletion hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def _add_tx(self, tx: BaseTransaction) -> None:
if self.tx_storage.transaction_exists(tx.hash):
return
try:
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
success = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
if success:
self.sync_agent.protocol.connections.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.sync_agent.protocol.send_error_and_close_connection('invalid vertex received')
raise
7 changes: 5 additions & 2 deletions hathor/profiler/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import time
from collections import defaultdict
from functools import wraps
from typing import Any, Callable, Union
from typing import Callable, ParamSpec, TypeVar, Union

from twisted.internet.task import LoopingCall

Key = tuple[str, ...]

T = TypeVar('T')
P = ParamSpec('P')


class ProcItem:
"""Store information for each process."""
Expand Down Expand Up @@ -184,7 +187,7 @@ def update(self) -> None:
t1 = time.process_time()
self.measures[('profiler',)].add_time(t1 - t0)

def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[..., Any]], Any]:
def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Decorator to collect data. The `key` must be the key itself
or a method that returns the key.

Expand Down
5 changes: 5 additions & 0 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,11 @@ def can_validate_full(self, vertex: Vertex) -> bool:
return True
return all_exist and all_valid

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.allow_partially_validated_context():
return self.transaction_exists(vertex_id)


class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]
Expand Down
15 changes: 3 additions & 12 deletions hathor/vertex_handler/vertex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from hathor.consensus import ConsensusAlgorithm
from hathor.exception import HathorError, InvalidNewTransaction
from hathor.feature_activation.feature_service import FeatureService
from hathor.p2p.manager import ConnectionsManager
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol
from hathor.transaction import BaseTransaction, Block
Expand All @@ -30,6 +30,7 @@
from hathor.wallet import BaseWallet

logger = get_logger()
cpu = get_cpu_profiler()


class VertexHandler:
Expand All @@ -40,7 +41,6 @@ class VertexHandler:
'_tx_storage',
'_verification_service',
'_consensus',
'_p2p_manager',
'_feature_service',
'_pubsub',
'_wallet',
Expand All @@ -55,7 +55,6 @@ def __init__(
tx_storage: TransactionStorage,
verification_service: VerificationService,
consensus: ConsensusAlgorithm,
p2p_manager: ConnectionsManager,
feature_service: FeatureService,
pubsub: PubSubManager,
wallet: BaseWallet | None,
Expand All @@ -67,27 +66,25 @@ def __init__(
self._tx_storage = tx_storage
self._verification_service = verification_service
self._consensus = consensus
self._p2p_manager = p2p_manager
self._feature_service = feature_service
self._pubsub = pubsub
self._wallet = wallet
self._log_vertex_bytes = log_vertex_bytes

@cpu.profiler('on_new_vertex')
def on_new_vertex(
self,
vertex: BaseTransaction,
*,
quiet: bool = False,
fails_silently: bool = True,
propagate_to_peers: bool = True,
reject_locked_reward: bool = True,
) -> bool:
""" New method for adding transactions or blocks that steps the validation state machine.

:param vertex: transaction to be added
:param quiet: if True will not log when a new tx is accepted
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
is_valid = self._validate_vertex(
vertex,
Expand All @@ -102,7 +99,6 @@ def on_new_vertex(
self._post_consensus(
vertex,
quiet=quiet,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward
)

Expand Down Expand Up @@ -177,7 +173,6 @@ def _post_consensus(
vertex: BaseTransaction,
*,
quiet: bool,
propagate_to_peers: bool,
reject_locked_reward: bool,
) -> None:
""" Handle operations that need to happen once the tx becomes fully validated.
Expand Down Expand Up @@ -208,10 +203,6 @@ def _post_consensus(

self._log_new_object(vertex, 'new {}', quiet=quiet)

if propagate_to_peers:
# Propagate to our peers.
self._p2p_manager.send_tx_to_peers(vertex)

def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
""" A shortcut for logging additional information for block/txs.
"""
Expand Down
Loading