Skip to content

refactor(p2p): refactor peer address handling [part 1/10] #1173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_endpoint import PeerEndpoint
from hathor.p2p.utils import discover_hostname, get_genesis_short_hash
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
Expand Down Expand Up @@ -420,7 +420,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
p2p_manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))

if self._args.bootstrap:
entrypoints = [Entrypoint.parse(desc) for desc in self._args.bootstrap]
entrypoints = [PeerEndpoint.parse(desc) for desc in self._args.bootstrap]
p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(entrypoints))

if self._args.x_rocksdb_indexes:
Expand Down
215 changes: 0 additions & 215 deletions hathor/p2p/entrypoint.py

This file was deleted.

48 changes: 26 additions & 22 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from twisted.web.client import Agent

from hathor.conf.settings import HathorSettings
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint
from hathor.p2p.peer_id import PeerId
from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage
from hathor.p2p.protocol import HathorProtocol
Expand Down Expand Up @@ -60,7 +60,7 @@ class _SyncRotateInfo(NamedTuple):


class _ConnectingPeer(NamedTuple):
entrypoint: Entrypoint
entrypoint: PeerEndpoint
endpoint_deferred: Deferred


Expand Down Expand Up @@ -370,7 +370,7 @@ def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer
endpoint: IStreamClientEndpoint) -> None:
connecting_peer = self.connecting_peers[endpoint]
entrypoint = connecting_peer.entrypoint
self.log.warn('connection failure', entrypoint=entrypoint, failure=failure.getErrorMessage())
self.log.warn('connection failure', entrypoint=str(entrypoint), failure=failure.getErrorMessage())
self.connecting_peers.pop(endpoint)

self.pubsub.publish(
Expand Down Expand Up @@ -475,7 +475,7 @@ def iter_ready_connections(self) -> Iterable[HathorProtocol]:
for conn in self.connected_peers.values():
yield conn

def iter_not_ready_endpoints(self) -> Iterable[Entrypoint]:
def iter_not_ready_endpoints(self) -> Iterable[PeerEndpoint]:
"""Iterate over not-ready connections."""
for connecting_peer in self.connecting_peers.values():
yield connecting_peer.entrypoint
Expand Down Expand Up @@ -589,27 +589,28 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in

assert peer.id is not None
if peer.info.can_retry(now):
self.connect_to(self.rng.choice(peer.info.entrypoints), peer)
addr = self.rng.choice(peer.info.entrypoints)
self.connect_to(addr.with_id(peer.id), peer)

def _connect_to_callback(
self,
protocol: IProtocol,
peer: Optional[UnverifiedPeer | PublicPeer],
peer: UnverifiedPeer | PublicPeer | None,
endpoint: IStreamClientEndpoint,
entrypoint: Entrypoint,
entrypoint: PeerEndpoint,
) -> None:
"""Called when we successfully connect to a peer."""
if isinstance(protocol, HathorProtocol):
protocol.on_outbound_connect(entrypoint)
protocol.on_outbound_connect(entrypoint, peer)
else:
assert isinstance(protocol, TLSMemoryBIOProtocol)
assert isinstance(protocol.wrappedProtocol, HathorProtocol)
protocol.wrappedProtocol.on_outbound_connect(entrypoint)
protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer)
self.connecting_peers.pop(endpoint)

def connect_to(
self,
entrypoint: Entrypoint,
entrypoint: PeerEndpoint,
peer: UnverifiedPeer | PublicPeer | None = None,
use_ssl: bool | None = None,
) -> None:
Expand All @@ -618,24 +619,27 @@ def connect_to(

If `use_ssl` is True, then the connection will be wraped by a TLS.
"""
if entrypoint.peer_id is not None and peer is not None and str(entrypoint.peer_id) != peer.id:
if entrypoint.peer_id is not None and peer is not None and entrypoint.peer_id != peer.id:
self.log.debug('skipping because the entrypoint peer_id does not match the actual peer_id',
entrypoint=entrypoint)
entrypoint=str(entrypoint))
return

for connecting_peer in self.connecting_peers.values():
if connecting_peer.entrypoint.equals_ignore_peer_id(entrypoint):
self.log.debug('skipping because we are already connecting to this endpoint', entrypoint=entrypoint)
if connecting_peer.entrypoint.addr == entrypoint.addr:
self.log.debug(
'skipping because we are already connecting to this endpoint',
entrypoint=str(entrypoint),
)
return

if self.localhost_only and not entrypoint.is_localhost():
self.log.debug('skip because of simple localhost check', entrypoint=entrypoint)
if self.localhost_only and not entrypoint.addr.is_localhost():
self.log.debug('skip because of simple localhost check', entrypoint=str(entrypoint))
return

if use_ssl is None:
use_ssl = self.use_ssl

endpoint = entrypoint.to_client_endpoint(self.reactor)
endpoint = entrypoint.addr.to_client_endpoint(self.reactor)

factory: IProtocolFactory
if use_ssl:
Expand All @@ -650,9 +654,9 @@ def connect_to(
deferred = endpoint.connect(factory)
self.connecting_peers[endpoint] = _ConnectingPeer(entrypoint, deferred)

deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint) # type: ignore
deferred.addErrback(self.on_connection_failure, peer, endpoint) # type: ignore
self.log.info('connect to', entrypoint=str(entrypoint), peer=str(peer))
deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint)
deferred.addErrback(self.on_connection_failure, peer, endpoint)
self.log.info('connecting to', entrypoint=str(entrypoint), peer=str(peer))
self.pubsub.publish(
HathorEvents.NETWORK_PEER_CONNECTING,
peer=peer,
Expand Down Expand Up @@ -708,13 +712,13 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname:
assert self.manager is not None
for address in self._listen_addresses:
if old_hostname is not None:
old_entrypoint = Entrypoint.from_hostname_address(old_hostname, address)
old_entrypoint = PeerAddress.from_hostname_address(old_hostname, address)
if old_entrypoint in self.my_peer.info.entrypoints:
self.my_peer.info.entrypoints.remove(old_entrypoint)
self._add_hostname_entrypoint(new_hostname, address)

def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None:
hostname_entrypoint = Entrypoint.from_hostname_address(hostname, address)
hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address)
self.my_peer.info.entrypoints.append(hostname_entrypoint)

def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
Expand Down
Loading
Loading