Skip to content

Commit dfa95ea

Browse files
committed
refactor(p2p): minor pre-IPC refactors
1 parent 5234061 commit dfa95ea

13 files changed

+124
-76
lines changed

hathor/builder/builder.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ def build(self) -> BuildArtifacts:
232232
vertex_handler = self._get_or_create_vertex_handler()
233233
vertex_parser = self._get_or_create_vertex_parser()
234234
poa_block_producer = self._get_or_create_poa_block_producer()
235+
capabilities = self._get_or_create_capabilities()
235236

236237
if self._enable_address_index:
237238
indexes.enable_address_index(pubsub)
@@ -263,7 +264,7 @@ def build(self) -> BuildArtifacts:
263264
wallet=wallet,
264265
rng=self._rng,
265266
checkpoints=self._checkpoints,
266-
capabilities=self._capabilities,
267+
capabilities=capabilities,
267268
environment_info=get_environment_info(self._cmdline, str(peer.id)),
268269
bit_signaling_service=bit_signaling_service,
269270
verification_service=verification_service,
@@ -642,6 +643,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:
642643

643644
return self._poa_block_producer
644645

646+
def _get_or_create_capabilities(self) -> list[str]:
647+
if self._capabilities is None:
648+
settings = self._get_or_create_settings()
649+
self._capabilities = settings.get_default_capabilities()
650+
651+
return self._capabilities
652+
645653
def use_memory(self) -> 'Builder':
646654
self.check_if_can_modify()
647655
self._storage_type = StorageType.MEMORY

hathor/builder/cli_builder.py

+2
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
317317
)
318318

319319
cpu_mining_service = CpuMiningService()
320+
capabilities = settings.get_default_capabilities()
320321

321322
p2p_manager = ConnectionsManager(
322323
settings=settings,
@@ -384,6 +385,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
384385
vertex_handler=vertex_handler,
385386
vertex_parser=vertex_parser,
386387
poa_block_producer=poa_block_producer,
388+
capabilities=capabilities,
387389
)
388390

389391
if self._args.x_ipython_kernel:

hathor/conf/settings.py

+8
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,14 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
457457
validators=_VALIDATORS
458458
)
459459

460+
def get_default_capabilities(self) -> list[str]:
461+
"""Return the default capabilities."""
462+
return [
463+
self.CAPABILITY_WHITELIST,
464+
self.CAPABILITY_SYNC_VERSION,
465+
self.CAPABILITY_GET_BEST_BLOCKCHAIN
466+
]
467+
460468

461469
def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
462470
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""

hathor/manager.py

+2-13
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ def __init__(
108108
execution_manager: ExecutionManager,
109109
vertex_handler: VertexHandler,
110110
vertex_parser: VertexParser,
111+
capabilities: list[str],
111112
hostname: Optional[str] = None,
112113
wallet: Optional[BaseWallet] = None,
113-
capabilities: Optional[list[str]] = None,
114114
checkpoints: Optional[list[Checkpoint]] = None,
115115
rng: Optional[Random] = None,
116116
environment_info: Optional[EnvironmentInfo] = None,
@@ -230,10 +230,7 @@ def __init__(
230230
self.peers_whitelist: list[PeerId] = []
231231

232232
# List of capabilities of the peer
233-
if capabilities is not None:
234-
self.capabilities = capabilities
235-
else:
236-
self.capabilities = self.get_default_capabilities()
233+
self.capabilities = capabilities
237234

238235
# This is included in some logs to provide more context
239236
self.environment_info = environment_info
@@ -245,14 +242,6 @@ def __init__(
245242
self.lc_check_sync_state.clock = self.reactor
246243
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL
247244

248-
def get_default_capabilities(self) -> list[str]:
249-
"""Return the default capabilities for this manager."""
250-
return [
251-
self._settings.CAPABILITY_WHITELIST,
252-
self._settings.CAPABILITY_SYNC_VERSION,
253-
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN
254-
]
255-
256245
def start(self) -> None:
257246
""" A factory must be started only once. And it is usually automatically started.
258247
"""

hathor/metrics.py

+14-11
Original file line numberDiff line numberDiff line change
@@ -248,23 +248,26 @@ def collect_peer_connection_metrics(self) -> None:
248248
self.peer_connection_metrics.clear()
249249

250250
for connection in self.connections.connections:
251-
if not connection._peer:
251+
peer = connection.get_peer_if_set()
252+
if not peer:
252253
# A connection without peer will not be able to communicate
253254
# So we can just discard it for the sake of the metrics
254255
continue
255256

257+
entrypoint = connection.get_entrypoint()
258+
metrics = connection.get_metrics()
256259
metric = PeerConnectionMetrics(
257-
connection_string=str(connection.entrypoint) if connection.entrypoint else "",
258-
peer_id=str(connection.peer.id),
260+
connection_string=str(entrypoint) if entrypoint else '',
261+
peer_id=str(peer.id),
259262
network=settings.NETWORK_NAME,
260-
received_messages=connection.metrics.received_messages,
261-
sent_messages=connection.metrics.sent_messages,
262-
received_bytes=connection.metrics.received_bytes,
263-
sent_bytes=connection.metrics.sent_bytes,
264-
received_txs=connection.metrics.received_txs,
265-
discarded_txs=connection.metrics.discarded_txs,
266-
received_blocks=connection.metrics.received_blocks,
267-
discarded_blocks=connection.metrics.discarded_blocks,
263+
received_messages=metrics.received_messages,
264+
sent_messages=metrics.sent_messages,
265+
received_bytes=metrics.received_bytes,
266+
sent_bytes=metrics.sent_bytes,
267+
received_txs=metrics.received_txs,
268+
discarded_txs=metrics.discarded_txs,
269+
received_blocks=metrics.received_blocks,
270+
discarded_blocks=metrics.discarded_blocks,
268271
)
269272

270273
self.peer_connection_metrics.append(metric)

hathor/p2p/manager.py

+40-33
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,7 @@ def has_synced_peer(self) -> bool:
338338
"""
339339
connections = list(self.iter_ready_connections())
340340
for conn in connections:
341-
assert conn.state is not None
342-
assert isinstance(conn.state, ReadyState)
343-
if conn.state.is_synced():
341+
if conn.is_synced():
344342
return True
345343
return False
346344

@@ -357,9 +355,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
357355
connections = list(self.iter_ready_connections())
358356
self.rng.shuffle(connections)
359357
for conn in connections:
360-
assert conn.state is not None
361-
assert isinstance(conn.state, ReadyState)
362-
conn.state.send_tx_to_peer(tx)
358+
conn.send_tx_to_peer(tx)
363359

364360
def disconnect_all_peers(self, *, force: bool = False) -> None:
365361
"""Disconnect all peers."""
@@ -396,12 +392,10 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
396392

397393
def on_peer_ready(self, protocol: HathorProtocol) -> None:
398394
"""Called when a peer is ready."""
399-
assert protocol.peer is not None
400-
self.verified_peer_storage.add_or_replace(protocol.peer)
401-
assert protocol.peer.id is not None
402-
395+
protocol_peer = protocol.get_peer()
396+
self.verified_peer_storage.add_or_replace(protocol_peer)
403397
self.handshaking_peers.remove(protocol)
404-
self.unverified_peer_storage.pop(protocol.peer.id, None)
398+
self.unverified_peer_storage.pop(protocol_peer.id, None)
405399

406400
# we emit the event even if it's a duplicate peer as a matching
407401
# NETWORK_PEER_DISCONNECTED will be emitted regardless
@@ -411,7 +405,7 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
411405
peers_count=self._get_peers_count()
412406
)
413407

414-
if protocol.peer.id in self.connected_peers:
408+
if protocol_peer.id in self.connected_peers:
415409
# connected twice to same peer
416410
self.log.warn('duplicate connection to peer', protocol=protocol)
417411
conn = self.get_connection_to_drop(protocol)
@@ -420,35 +414,35 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
420414
# the new connection is being dropped, so don't save it to connected_peers
421415
return
422416

423-
self.connected_peers[protocol.peer.id] = protocol
417+
self.connected_peers[protocol_peer.id] = protocol
424418

425419
# In case it was a retry, we must reset the data only here, after it gets ready
426-
protocol.peer.info.reset_retry_timestamp()
420+
protocol_peer.info.reset_retry_timestamp()
427421

428422
if len(self.connected_peers) <= self.MAX_ENABLED_SYNC:
429423
protocol.enable_sync()
430424

431-
if protocol.peer.id in self.always_enable_sync:
425+
if protocol_peer.id in self.always_enable_sync:
432426
protocol.enable_sync()
433427

434428
# Notify other peers about this new peer connection.
435-
self.relay_peer_to_ready_connections(protocol.peer)
429+
self.relay_peer_to_ready_connections(protocol_peer)
436430

437431
def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
438432
"""Relay peer to all ready connections."""
439433
for conn in self.iter_ready_connections():
440-
if conn.peer == peer:
434+
if conn.get_peer() == peer:
441435
continue
442-
assert isinstance(conn.state, ReadyState)
443-
conn.state.send_peers([peer])
436+
conn.send_peers([peer])
444437

445438
def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
446439
"""Called when a peer disconnect."""
447440
self.connections.discard(protocol)
448441
if protocol in self.handshaking_peers:
449442
self.handshaking_peers.remove(protocol)
450-
if protocol._peer is not None:
451-
existing_protocol = self.connected_peers.pop(protocol.peer.id, None)
443+
protocol_peer = protocol.get_peer_if_set()
444+
if protocol_peer is not None:
445+
existing_protocol = self.connected_peers.pop(protocol_peer.id, None)
452446
if existing_protocol is None:
453447
# in this case, the connection was closed before it got to READY state
454448
return
@@ -458,7 +452,7 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
458452
# A check for duplicate connections is done during PEER_ID state, but there's still a
459453
# chance it can happen if both connections start at the same time and none of them has
460454
# reached READY state while the other is on PEER_ID state
461-
self.connected_peers[protocol.peer.id] = existing_protocol
455+
self.connected_peers[protocol_peer.id] = existing_protocol
462456
self.pubsub.publish(
463457
HathorEvents.NETWORK_PEER_DISCONNECTED,
464458
protocol=protocol,
@@ -480,8 +474,9 @@ def iter_not_ready_endpoints(self) -> Iterable[Entrypoint]:
480474
for connecting_peer in self.connecting_peers.values():
481475
yield connecting_peer.entrypoint
482476
for protocol in self.handshaking_peers:
483-
if protocol.entrypoint is not None:
484-
yield protocol.entrypoint
477+
protocol_entrypoint = protocol.get_entrypoint()
478+
if protocol_entrypoint is not None:
479+
yield protocol_entrypoint
485480
else:
486481
self.log.warn('handshaking protocol has empty connection string', protocol=protocol)
487482

@@ -723,30 +718,28 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
723718
We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2)
724719
on the peer id string is used for this comparison.
725720
"""
726-
assert protocol.peer is not None
727-
assert protocol.peer.id is not None
728-
assert protocol.my_peer.id is not None
729-
other_connection = self.connected_peers[protocol.peer.id]
730-
if bytes(protocol.my_peer.id) > bytes(protocol.peer.id):
721+
protocol_peer = protocol.get_peer()
722+
other_connection = self.connected_peers[protocol_peer.id]
723+
if bytes(self.my_peer.id) > bytes(protocol_peer.id):
731724
# connection started by me is kept
732-
if not protocol.inbound:
725+
if not protocol.is_inbound():
733726
# other connection is dropped
734727
return other_connection
735728
else:
736729
# this was started by peer, so drop it
737730
return protocol
738731
else:
739732
# connection started by peer is kept
740-
if not protocol.inbound:
733+
if not protocol.is_inbound():
741734
return protocol
742735
else:
743736
return other_connection
744737

745738
def drop_connection(self, protocol: HathorProtocol) -> None:
746739
""" Drop a connection
747740
"""
748-
assert protocol.peer is not None
749-
self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__)
741+
protocol_peer = protocol.get_peer()
742+
self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__)
750743
protocol.send_error_and_close_connection('Connection droped')
751744

752745
def drop_connection_by_peer_id(self, peer_id: PeerId) -> None:
@@ -843,3 +836,17 @@ def reload_entrypoints_and_connections(self) -> None:
843836
self.log.warn('Killing all connections and resetting entrypoints...')
844837
self.disconnect_all_peers(force=True)
845838
self.my_peer.reload_entrypoints_from_source_file()
839+
840+
def get_peers_whitelist(self) -> list[PeerId]:
841+
assert self.manager is not None
842+
return self.manager.peers_whitelist
843+
844+
def get_verified_peers(self) -> Iterable[PublicPeer]:
845+
return self.verified_peer_storage.values()
846+
847+
def get_randbytes(self, n: int) -> bytes:
848+
return self.rng.randbytes(n)
849+
850+
def is_peer_whitelisted(self, peer_id: PeerId) -> bool:
851+
assert self.manager is not None
852+
return peer_id in self.manager.peers_whitelist

hathor/p2p/protocol.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import time
1616
from enum import Enum
17-
from typing import TYPE_CHECKING, Any, Coroutine, Generator, Optional, cast
17+
from typing import TYPE_CHECKING, Any, Coroutine, Generator, Iterable, Optional, cast
1818

1919
from structlog import get_logger
2020
from twisted.internet.defer import Deferred
@@ -33,6 +33,7 @@
3333
from hathor.p2p.sync_version import SyncVersion
3434
from hathor.p2p.utils import format_address
3535
from hathor.profiler import get_cpu_profiler
36+
from hathor.transaction import BaseTransaction
3637

3738
if TYPE_CHECKING:
3839
from hathor.manager import HathorManager # noqa: F401
@@ -390,6 +391,33 @@ def disable_sync(self) -> None:
390391
self.log.info('disable sync')
391392
self.state.sync_agent.disable_sync()
392393

394+
def is_synced(self) -> bool:
395+
assert isinstance(self.state, ReadyState)
396+
return self.state.is_synced()
397+
398+
def send_tx_to_peer(self, tx: BaseTransaction) -> None:
399+
assert isinstance(self.state, ReadyState)
400+
return self.state.send_tx_to_peer(tx)
401+
402+
def get_peer(self) -> PublicPeer:
403+
return self.peer
404+
405+
def get_peer_if_set(self) -> PublicPeer | None:
406+
return self._peer
407+
408+
def send_peers(self, peers: Iterable[PublicPeer]) -> None:
409+
assert isinstance(self.state, ReadyState)
410+
self.state.send_peers(peers)
411+
412+
def get_entrypoint(self) -> Entrypoint | None:
413+
return self.entrypoint
414+
415+
def is_inbound(self) -> bool:
416+
return self.inbound
417+
418+
def get_metrics(self) -> 'ConnectionMetrics':
419+
return self.metrics
420+
393421

394422
class HathorLineReceiver(LineReceiver, HathorProtocol):
395423
""" Implements HathorProtocol in a LineReceiver protocol.

hathor/p2p/states/peer_id.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:
145145
146146
Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
147147
"""
148-
peer_is_whitelisted = peer_id in self.protocol.node.peers_whitelist
148+
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
149149
# never block whitelisted peers
150150
if peer_is_whitelisted:
151151
return False

hathor/p2p/states/ready.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def handle_get_peers(self, payload: str) -> None:
155155
""" Executed when a GET-PEERS command is received. It just responds with
156156
a list of all known peers.
157157
"""
158-
for peer in self.protocol.connections.verified_peer_storage.values():
158+
for peer in self.protocol.connections.get_verified_peers():
159159
self.send_peers([peer])
160160

161161
def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
@@ -195,8 +195,7 @@ def send_ping(self) -> None:
195195
"""
196196
# Add a salt number to prevent peers from faking rtt.
197197
self.ping_start_time = self.reactor.seconds()
198-
rng = self.protocol.connections.rng
199-
self.ping_salt = rng.randbytes(self.ping_salt_size).hex()
198+
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
200199
self.send_message(ProtocolMessages.PING, self.ping_salt)
201200

202201
def send_pong(self, salt: str) -> None:

0 commit comments

Comments
 (0)