Skip to content

refactor(p2p): implement P2PDependencies class [part 6/11] #1152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: refactor/p2p/pre-ipc-refactors
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p import P2PDependencies
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.pubsub import PubSubManager
Expand Down Expand Up @@ -64,12 +65,10 @@ class SyncSupportLevel(IntEnum):
@classmethod
def add_factories(
cls,
settings: HathorSettingsType,
p2p_manager: ConnectionsManager,
dependencies: P2PDependencies,
sync_v1_support: 'SyncSupportLevel',
sync_v2_support: 'SyncSupportLevel',
vertex_parser: VertexParser,
vertex_handler: VertexHandler,
) -> None:
"""Adds the sync factory to the manager according to the support level."""
from hathor.p2p.sync_v1.factory import SyncV11Factory
Expand All @@ -78,18 +77,12 @@ def add_factories(

# sync-v1 support:
if sync_v1_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(p2p_manager, vertex_parser=vertex_parser))
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies))
if sync_v1_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V1_1)
# sync-v2 support:
if sync_v2_support > cls.UNAVAILABLE:
sync_v2_factory = SyncV2Factory(
settings,
p2p_manager,
vertex_parser=vertex_parser,
vertex_handler=vertex_handler,
)
p2p_manager.add_sync_factory(SyncVersion.V2, sync_v2_factory)
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies))
if sync_v2_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V2)

Expand Down Expand Up @@ -263,7 +256,6 @@ def build(self) -> BuildArtifacts:
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
environment_info=get_environment_info(self._cmdline, str(peer.id)),
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
Expand Down Expand Up @@ -415,25 +407,31 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager:
return self._p2p_manager

enable_ssl = True
reactor = self._get_reactor()
my_peer = self._get_peer()

self._p2p_manager = ConnectionsManager(
dependencies = P2PDependencies(
reactor=self._get_reactor(),
settings=self._get_or_create_settings(),
reactor=reactor,
vertex_parser=self._get_or_create_vertex_parser(),
tx_storage=self._get_or_create_tx_storage(),
vertex_handler=self._get_or_create_vertex_handler(),
verification_service=self._get_or_create_verification_service(),
capabilities=self._get_or_create_capabilities(),
whitelist_only=False,
)

self._p2p_manager = ConnectionsManager(
dependencies=dependencies,
my_peer=my_peer,
pubsub=self._get_or_create_pubsub(),
ssl=enable_ssl,
whitelist_only=False,
rng=self._rng,
)
SyncSupportLevel.add_factories(
self._get_or_create_settings(),
self._p2p_manager,
dependencies,
self._sync_v1_support,
self._sync_v2_support,
self._get_or_create_vertex_parser(),
self._get_or_create_vertex_handler(),
)
return self._p2p_manager

Expand Down Expand Up @@ -642,6 +640,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:

return self._poa_block_producer

def _get_or_create_capabilities(self) -> list[str]:
if self._capabilities is None:
settings = self._get_or_create_settings()
self._capabilities = settings.get_default_capabilities()

return self._capabilities

def use_memory(self) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
Expand Down
35 changes: 22 additions & 13 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p import P2PDependencies
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_endpoint import PeerEndpoint
Expand Down Expand Up @@ -317,16 +318,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
)

cpu_mining_service = CpuMiningService()

p2p_manager = ConnectionsManager(
settings=settings,
reactor=reactor,
my_peer=peer,
pubsub=pubsub,
ssl=True,
whitelist_only=False,
rng=Random(),
)
capabilities = settings.get_default_capabilities()

vertex_handler = VertexHandler(
reactor=reactor,
Expand All @@ -340,13 +332,30 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
log_vertex_bytes=self._args.log_vertex_bytes,
)

p2p_dependencies = P2PDependencies(
reactor=reactor,
settings=settings,
vertex_parser=vertex_parser,
tx_storage=tx_storage,
vertex_handler=vertex_handler,
verification_service=verification_service,
whitelist_only=False,
capabilities=capabilities,
)

p2p_manager = ConnectionsManager(
dependencies=p2p_dependencies,
my_peer=peer,
pubsub=pubsub,
ssl=True,
rng=Random(),
)

SyncSupportLevel.add_factories(
settings,
p2p_manager,
p2p_dependencies,
sync_v1_support,
sync_v2_support,
vertex_parser,
vertex_handler,
)

from hathor.consensus.poa import PoaBlockProducer, PoaSignerFile
Expand Down
14 changes: 9 additions & 5 deletions hathor/cli/quick_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import os
from argparse import ArgumentParser
from typing import Any
from typing import TYPE_CHECKING

from structlog import get_logger

from hathor.cli.run_node import RunNode

if TYPE_CHECKING:
from hathor.transaction import Vertex

logger = get_logger()


Expand All @@ -30,18 +35,17 @@ def __init__(self, vertex_handler, manager, n_blocks):
self._manager = manager
self._n_blocks = n_blocks

def on_new_vertex(self, *args: Any, **kwargs: Any) -> bool:
def on_new_vertex(self, vertex: Vertex, *, fails_silently: bool) -> bool:
from hathor.transaction import Block
from hathor.transaction.base_transaction import GenericVertex

msg: str | None = None
res = self._vertex_handler.on_new_vertex(*args, **kwargs)
res = self._vertex_handler.on_new_vertex(vertex=vertex, fails_silently=fails_silently)

if self._n_blocks is None:
should_quit = res
msg = 'added a tx'
else:
vertex = args[0]
should_quit = False
assert isinstance(vertex, GenericVertex)

Expand Down Expand Up @@ -77,7 +81,7 @@ def prepare(self, *, register_resources: bool = True) -> None:
self.log.info('patching vertex_handler.on_new_vertex to quit on success')
p2p_factory = self.manager.connections.get_sync_factory(SyncVersion.V2)
assert isinstance(p2p_factory, SyncV2Factory)
p2p_factory.vertex_handler = VertexHandlerWrapper(
p2p_factory.dependencies.vertex_handler = VertexHandlerWrapper(
self.manager.vertex_handler,
self.manager,
self._args.quit_after_n_blocks,
Expand Down
8 changes: 8 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,14 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
validators=_VALIDATORS
)

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities."""
return [
self.CAPABILITY_WHITELIST,
self.CAPABILITY_SYNC_VERSION,
self.CAPABILITY_GET_BEST_BLOCKCHAIN
]


def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""
Expand Down
4 changes: 2 additions & 2 deletions hathor/consensus/block_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def remove_first_block_markers(self, block: Block) -> None:
storage = block.storage

from hathor.transaction.storage.traversal import BFSTimestampWalk
bfs = BFSTimestampWalk(storage, is_dag_verifications=True, is_left_to_right=False)
bfs = BFSTimestampWalk(storage.get_vertex, is_dag_verifications=True, is_left_to_right=False)
for tx in bfs.run(block, skip_root=True):
if tx.is_block:
bfs.skip_neighbors(tx)
Expand Down Expand Up @@ -469,7 +469,7 @@ def _score_block_dfs(self, block: BaseTransaction, used: set[bytes],

else:
from hathor.transaction.storage.traversal import BFSTimestampWalk
bfs = BFSTimestampWalk(storage, is_dag_verifications=True, is_left_to_right=False)
bfs = BFSTimestampWalk(storage.get_vertex, is_dag_verifications=True, is_left_to_right=False)
for tx in bfs.run(parent, skip_root=False):
assert not tx.is_block

Expand Down
6 changes: 4 additions & 2 deletions hathor/consensus/transaction_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ def remove_voided_by(self, tx: Transaction, voided_hash: bytes) -> bool:

self.log.debug('remove_voided_by', tx=tx.hash_hex, voided_hash=voided_hash.hex())

bfs = BFSTimestampWalk(tx.storage, is_dag_funds=True, is_dag_verifications=True, is_left_to_right=True)
bfs = BFSTimestampWalk(
tx.storage.get_vertex, is_dag_funds=True, is_dag_verifications=True, is_left_to_right=True
)
check_list: list[BaseTransaction] = []
for tx2 in bfs.run(tx, skip_root=False):
assert tx2.storage is not None
Expand Down Expand Up @@ -400,7 +402,7 @@ def add_voided_by(self, tx: Transaction, voided_hash: bytes) -> bool:
is_dag_verifications = False

from hathor.transaction.storage.traversal import BFSTimestampWalk
bfs = BFSTimestampWalk(tx.storage, is_dag_funds=True, is_dag_verifications=is_dag_verifications,
bfs = BFSTimestampWalk(tx.storage.get_vertex, is_dag_funds=True, is_dag_verifications=is_dag_verifications,
is_left_to_right=True)
check_list: list[Transaction] = []
for tx2 in bfs.run(tx, skip_root=False):
Expand Down
2 changes: 1 addition & 1 deletion hathor/indexes/mempool_tips_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def iter(self, tx_storage: 'TransactionStorage', max_timestamp: Optional[float]

def iter_all(self, tx_storage: 'TransactionStorage') -> Iterator[Transaction]:
from hathor.transaction.storage.traversal import BFSTimestampWalk
bfs = BFSTimestampWalk(tx_storage, is_dag_verifications=True, is_left_to_right=False)
bfs = BFSTimestampWalk(tx_storage.get_vertex, is_dag_verifications=True, is_left_to_right=False)
for tx in bfs.run(self.iter(tx_storage), skip_root=False):
assert isinstance(tx, Transaction)
if tx.get_metadata().first_block is not None:
Expand Down
18 changes: 0 additions & 18 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(
vertex_parser: VertexParser,
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
capabilities: Optional[list[str]] = None,
checkpoints: Optional[list[Checkpoint]] = None,
rng: Optional[Random] = None,
environment_info: Optional[EnvironmentInfo] = None,
Expand Down Expand Up @@ -230,12 +229,6 @@ def __init__(
# List of whitelisted peers
self.peers_whitelist: list[PeerId] = []

# List of capabilities of the peer
if capabilities is not None:
self.capabilities = capabilities
else:
self.capabilities = self.get_default_capabilities()

# This is included in some logs to provide more context
self.environment_info = environment_info

Expand All @@ -246,14 +239,6 @@ def __init__(
self.lc_check_sync_state.clock = self.reactor
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities for this manager."""
return [
self._settings.CAPABILITY_WHITELIST,
self._settings.CAPABILITY_SYNC_VERSION,
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN
]

def start(self) -> None:
""" A factory must be started only once. And it is usually automatically started.
"""
Expand Down Expand Up @@ -986,9 +971,6 @@ def on_new_tx(

return success

def has_sync_version_capability(self) -> bool:
return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities

def add_peer_to_whitelist(self, peer_id: PeerId) -> None:
if not self._settings.ENABLE_PEER_WHITELIST:
return
Expand Down
30 changes: 16 additions & 14 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class Metrics:
# Variables to store the last block when we updated the RocksDB storage metrics
last_txstorage_data_block: Optional[int] = None

# Peers connected
connected_peers: int = 0
# Peers ready
ready_peers: int = 0
# Peers handshaking
handshaking_peers: int = 0
# Peers connecting
Expand Down Expand Up @@ -200,7 +200,7 @@ def handle_publish(self, key: HathorEvents, args: EventArguments) -> None:
):
peers_connection_metrics: PeerConnectionsMetrics = data["peers_count"]

self.connected_peers = peers_connection_metrics.connected_peers_count
self.ready_peers = peers_connection_metrics.ready_peers_count
self.connecting_peers = peers_connection_metrics.connecting_peers_count
self.handshaking_peers = peers_connection_metrics.handshaking_peers_count
self.known_peers = peers_connection_metrics.known_peers_count
Expand Down Expand Up @@ -247,24 +247,26 @@ def collect_peer_connection_metrics(self) -> None:
"""
self.peer_connection_metrics.clear()

for connection in self.connections.connections:
if not connection._peer:
for connection in self.connections.get_connected_peers():
peer = connection.get_peer_if_set()
if not peer:
# A connection without peer will not be able to communicate
# So we can just discard it for the sake of the metrics
continue

metrics = connection.get_metrics()
metric = PeerConnectionMetrics(
connection_string=str(connection.entrypoint) if connection.entrypoint else "",
connection_string=str(connection.addr),
peer_id=str(connection.peer.id),
network=settings.NETWORK_NAME,
received_messages=connection.metrics.received_messages,
sent_messages=connection.metrics.sent_messages,
received_bytes=connection.metrics.received_bytes,
sent_bytes=connection.metrics.sent_bytes,
received_txs=connection.metrics.received_txs,
discarded_txs=connection.metrics.discarded_txs,
received_blocks=connection.metrics.received_blocks,
discarded_blocks=connection.metrics.discarded_blocks,
received_messages=metrics.received_messages,
sent_messages=metrics.sent_messages,
received_bytes=metrics.received_bytes,
sent_bytes=metrics.sent_bytes,
received_txs=metrics.received_txs,
discarded_txs=metrics.discarded_txs,
received_blocks=metrics.received_blocks,
discarded_blocks=metrics.discarded_blocks,
)

self.peer_connection_metrics.append(metric)
Expand Down
Loading
Loading