Skip to content

refactor(p2p): minor pre-IPC refactors [part 5/11] #1165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: refactor/remove-vertex-storage-protocol
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(self) -> None:
self._settings: Optional[HathorSettingsType] = None
self._rng: Random = Random()
self._checkpoints: Optional[list[Checkpoint]] = None
self._capabilities: Optional[list[str]] = None
self._capabilities: Optional[tuple[str, ...]] = None

self._peer: Optional[PrivatePeer] = None
self._cmdline: str = ''
Expand Down Expand Up @@ -235,6 +235,7 @@ def build(self) -> BuildArtifacts:
vertex_handler = self._get_or_create_vertex_handler()
vertex_parser = self._get_or_create_vertex_parser()
poa_block_producer = self._get_or_create_poa_block_producer()
capabilities = self._get_or_create_capabilities()

if self._enable_address_index:
indexes.enable_address_index(pubsub)
Expand Down Expand Up @@ -266,7 +267,7 @@ def build(self) -> BuildArtifacts:
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
capabilities=capabilities,
environment_info=get_environment_info(self._cmdline, str(peer.id)),
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
Expand Down Expand Up @@ -335,7 +336,7 @@ def set_checkpoints(self, checkpoints: list[Checkpoint]) -> 'Builder':
self._checkpoints = checkpoints
return self

def set_capabilities(self, capabilities: list[str]) -> 'Builder':
def set_capabilities(self, capabilities: tuple[str, ...]) -> 'Builder':
self.check_if_can_modify()
self._capabilities = capabilities
return self
Expand Down Expand Up @@ -647,6 +648,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:

return self._poa_block_producer

def _get_or_create_capabilities(self) -> tuple[str, ...]:
if self._capabilities is None:
settings = self._get_or_create_settings()
self._capabilities = settings.get_default_capabilities()

return self._capabilities

def use_memory(self) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
Expand Down
2 changes: 2 additions & 0 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
)

cpu_mining_service = CpuMiningService()
capabilities = settings.get_default_capabilities()

p2p_manager = ConnectionsManager(
settings=settings,
Expand Down Expand Up @@ -386,6 +387,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
vertex_handler=vertex_handler,
vertex_parser=vertex_parser,
poa_block_producer=poa_block_producer,
capabilities=capabilities,
)

if self._args.x_ipython_kernel:
Expand Down
9 changes: 9 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
validators=_VALIDATORS
)

def get_default_capabilities(self) -> tuple[str, ...]:
"""Return the default capabilities."""
return (
self.CAPABILITY_WHITELIST,
self.CAPABILITY_SYNC_VERSION,
self.CAPABILITY_GET_BEST_BLOCKCHAIN,
self.CAPABILITY_IPV6,
)


def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""
Expand Down
16 changes: 2 additions & 14 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ def __init__(
execution_manager: ExecutionManager,
vertex_handler: VertexHandler,
vertex_parser: VertexParser,
capabilities: tuple[str, ...],
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
capabilities: Optional[list[str]] = None,
checkpoints: Optional[list[Checkpoint]] = None,
rng: Optional[Random] = None,
environment_info: Optional[EnvironmentInfo] = None,
Expand Down Expand Up @@ -231,10 +231,7 @@ def __init__(
self.peers_whitelist: list[PeerId] = []

# List of capabilities of the peer
if capabilities is not None:
self.capabilities = capabilities
else:
self.capabilities = self.get_default_capabilities()
self.capabilities = capabilities

# This is included in some logs to provide more context
self.environment_info = environment_info
Expand All @@ -246,15 +243,6 @@ def __init__(
self.lc_check_sync_state.clock = self.reactor
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities for this manager."""
return [
self._settings.CAPABILITY_WHITELIST,
self._settings.CAPABILITY_SYNC_VERSION,
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN,
self._settings.CAPABILITY_IPV6,
]

def start(self) -> None:
""" A factory must be started only once. And it is usually automatically started.
"""
Expand Down
20 changes: 11 additions & 9 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,25 @@ def collect_peer_connection_metrics(self) -> None:
self.peer_connection_metrics.clear()

for connection in self.connections.get_connected_peers():
if not connection._peer:
peer = connection.get_peer_if_set()
if not peer:
# A connection without peer will not be able to communicate
# So we can just discard it for the sake of the metrics
continue

metrics = connection.get_metrics()
metric = PeerConnectionMetrics(
connection_string=str(connection.addr),
peer_id=str(connection.peer.id),
network=settings.NETWORK_NAME,
received_messages=connection.metrics.received_messages,
sent_messages=connection.metrics.sent_messages,
received_bytes=connection.metrics.received_bytes,
sent_bytes=connection.metrics.sent_bytes,
received_txs=connection.metrics.received_txs,
discarded_txs=connection.metrics.discarded_txs,
received_blocks=connection.metrics.received_blocks,
discarded_blocks=connection.metrics.discarded_blocks,
received_messages=metrics.received_messages,
sent_messages=metrics.sent_messages,
received_bytes=metrics.received_bytes,
sent_bytes=metrics.sent_bytes,
received_txs=metrics.received_txs,
discarded_txs=metrics.discarded_txs,
received_blocks=metrics.received_blocks,
discarded_blocks=metrics.discarded_blocks,
)

self.peer_connection_metrics.append(metric)
Expand Down
49 changes: 37 additions & 12 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,7 @@ def has_synced_peer(self) -> bool:
"""
connections = list(self.iter_ready_connections())
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
if conn.state.is_synced():
if conn.is_synced():
return True
return False

Expand All @@ -349,9 +347,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
connections = list(self.iter_ready_connections())
self.rng.shuffle(connections)
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
conn.state.send_tx_to_peer(tx)
conn.send_tx_to_peer(tx)

def disconnect_all_peers(self, *, force: bool = False) -> None:
"""Disconnect all peers."""
Expand Down Expand Up @@ -416,10 +412,9 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
"""Relay peer to all ready connections."""
for conn in self.iter_ready_connections():
if conn.peer == peer:
if conn.get_peer() == peer:
continue
assert isinstance(conn.state, ReadyState)
conn.state.send_peers([peer])
conn.send_peers([peer])

def on_handshake_disconnect(self, *, addr: PeerAddress) -> None:
"""Called when a peer disconnects from a handshaking state (HELLO or PEER-ID)."""
Expand Down Expand Up @@ -694,9 +689,9 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add
def drop_connection(self, protocol: HathorProtocol) -> None:
""" Drop a connection
"""
assert protocol.peer is not None
self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection dropped')
protocol_peer = protocol.get_peer()
self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection droped')

def drop_connection_by_peer_id(self, peer_id: PeerId) -> None:
""" Drop a connection by peer id
Expand Down Expand Up @@ -795,3 +790,33 @@ def reload_entrypoints_and_connections(self) -> None:
self.log.warn('Killing all connections and resetting entrypoints...')
self.disconnect_all_peers(force=True)
self.my_peer.reload_entrypoints_from_source_file()

def get_peers_whitelist(self) -> list[PeerId]:
"""
Return a list of `PeerId`s in the whitelist.
This is implemented to conform with the P2PManagerProtocol.
"""
assert self.manager is not None
return self.manager.peers_whitelist

def get_verified_peers(self) -> Iterable[PublicPeer]:
"""
Return an iterable of `PublicPeer`s that are verified.
This is implemented to conform with the P2PManagerProtocol.
"""
return self.verified_peer_storage.values()

def get_randbytes(self, n: int) -> bytes:
"""
Generate `n` random bytes.
This is implemented to conform with the P2PManagerProtocol.
"""
return self.rng.randbytes(n)

def is_peer_whitelisted(self, peer_id: PeerId) -> bool:
"""
Return whether `peer_id_` is whitelisted.
This is implemented to conform with the P2PManagerProtocol.
"""
assert self.manager is not None
return peer_id in self.manager.peers_whitelist
48 changes: 47 additions & 1 deletion hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import time
from enum import Enum
from typing import TYPE_CHECKING, Optional, cast
from typing import TYPE_CHECKING, Iterable, Optional, cast

from structlog import get_logger
from twisted.internet import defer
Expand All @@ -34,6 +34,7 @@
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import format_address
from hathor.profiler import get_cpu_profiler
from hathor.transaction import BaseTransaction

if TYPE_CHECKING:
from hathor.manager import HathorManager # noqa: F401
Expand Down Expand Up @@ -410,6 +411,51 @@ def disable_sync(self) -> None:
self.log.info('disable sync')
self.state.sync_agent.disable_sync()

def is_synced(self) -> bool:
"""
Return whether this protocol is synced.
This is implemented to conform with the P2PConnectionProtocol.
"""
assert isinstance(self.state, ReadyState)
return self.state.is_synced()

def send_tx_to_peer(self, tx: BaseTransaction) -> None:
"""
Send a tx to the peer in this protocol, assuming it's ready.
This is implemented to conform with the P2PConnectionProtocol.
"""
assert isinstance(self.state, ReadyState)
return self.state.send_tx_to_peer(tx)

def get_peer(self) -> PublicPeer:
"""
Return this protocol's `PublicPeer`.
This is implemented to conform with the P2PConnectionProtocol.
"""
return self.peer

def get_peer_if_set(self) -> PublicPeer | None:
"""
Return this protocol's `PublicPeer` if it's set, `None` otherwise.
This is implemented to conform with the P2PConnectionProtocol.
"""
return self._peer

def send_peers(self, peers: Iterable[PublicPeer]) -> None:
"""
Send peers to the peer in this protocol, assuming it's ready.
This is implemented to conform with the P2PConnectionProtocol.
"""
assert isinstance(self.state, ReadyState)
self.state.send_peers(peers)

def get_metrics(self) -> 'ConnectionMetrics':
"""
Return this protocol's metrics.
This is implemented to conform with the P2PConnectionProtocol.
"""
return self.metrics


class HathorLineReceiver(LineReceiver, HathorProtocol):
""" Implements HathorProtocol in a LineReceiver protocol.
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:

Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
"""
peer_is_whitelisted = peer_id in self.protocol.node.peers_whitelist
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
# never block whitelisted peers
if peer_is_whitelisted:
return False
Expand Down
5 changes: 2 additions & 3 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
for peer in self.protocol.connections.verified_peer_storage.values():
for peer in self.protocol.connections.get_verified_peers():
self.send_peers([peer])

def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
Expand Down Expand Up @@ -206,8 +206,7 @@ def send_ping(self) -> None:
"""
# Add a salt number to prevent peers from faking rtt.
self.ping_start_time = self.reactor.seconds()
rng = self.protocol.connections.rng
self.ping_salt = rng.randbytes(self.ping_salt_size).hex()
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems better to simply pass an rng to the protocol. Isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RNG should be the same in the ConnectionsManager and in HathorProtocol, right? At least for simulation purposes.

If that's the case, we can't simply pass the RNG between them because now the protocol will live in a different process. Otherwise, if we can have different RNGs, then yes, it would be simpler to just have its own RNG here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a "derived" rng instance, where rng in the main process just generates a seed that is used to instantiate the rng in the subprocess.

There's really only one reason for using an rng instance instead of a global rng, and that is using seeds to reproduce test cases. It's an important for reproducing rare cases that happen randomly in tests.

Considering there's parallelism, some events can happen in different order regardless of the initial seed, and this will affect how we generate random bytes regardless if we query the main thread for random bytes or generate them locally, so in that sense I think generating them locally is better because it avoids an IPC call. And if we really need to we can focus on avoiding that parallelism in tests or simulating it in a reproducible way.

self.send_message(ProtocolMessages.PING, self.ping_salt)

def send_pong(self, salt: str) -> None:
Expand Down
12 changes: 5 additions & 7 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient
from hathor.reactor import ReactorProtocol as Reactor
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.genesis import is_genesis
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.vertex_parser import VertexParser
from hathor.types import VertexId
Expand Down Expand Up @@ -383,7 +384,7 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
# Not synced but same blockchain?
if self.peer_best_block.height <= my_best_block.height:
# Is peer behind me at the same blockchain?
common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height)
common_block_hash = self.tx_storage.get_block_id_by_height(self.peer_best_block.height)
if common_block_hash == self.peer_best_block.id:
# If yes, nothing to sync from this peer.
if not self.is_synced():
Expand Down Expand Up @@ -461,15 +462,13 @@ def send_get_tips(self) -> None:
def handle_get_tips(self, _payload: str) -> None:
""" Handle a GET-TIPS message.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
if self._is_streaming:
self.log.warn('can\'t send while streaming') # XXX: or can we?
self.send_message(ProtocolMessages.MEMPOOL_END)
return
self.log.debug('handle_get_tips')
# TODO Use a streaming of tips
for tx_id in self.tx_storage.indexes.mempool_tips.get():
for tx_id in self.tx_storage.get_mempool_tips():
self.send_tips(tx_id)
self.log.debug('tips end')
self.send_message(ProtocolMessages.TIPS_END)
Expand Down Expand Up @@ -645,15 +644,14 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None:
def handle_get_peer_block_hashes(self, payload: str) -> None:
""" Handle a GET-PEER-BLOCK-HASHES message.
"""
assert self.tx_storage.indexes is not None
heights = json.loads(payload)
if len(heights) > 20:
self.log.info('too many heights', heights_qty=len(heights))
self.protocol.send_error_and_close_connection('GET-PEER-BLOCK-HASHES: too many heights')
return
data = []
for h in heights:
blk_hash = self.tx_storage.indexes.height.get(h)
blk_hash = self.tx_storage.get_block_id_by_height(h)
if blk_hash is None:
break
blk = self.tx_storage.get_transaction(blk_hash)
Expand Down Expand Up @@ -1154,7 +1152,7 @@ def handle_data(self, payload: str) -> None:
return

assert tx is not None
if self.protocol.node.tx_storage.get_genesis(tx.hash):
if is_genesis(tx.hash, settings=self._settings):
# We just got the data of a genesis tx/block. What should we do?
# Will it reduce peer reputation score?
return
Expand Down
Loading
Loading