Skip to content

Commit 3b27e43

Browse files
committed
refactor(p2p): minor pre-IPC refactors
1 parent 07ecfc0 commit 3b27e43

13 files changed

+96
-53
lines changed

hathor/builder/builder.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ def build(self) -> BuildArtifacts:
232232
vertex_handler = self._get_or_create_vertex_handler()
233233
vertex_parser = self._get_or_create_vertex_parser()
234234
poa_block_producer = self._get_or_create_poa_block_producer()
235+
capabilities = self._get_or_create_capabilities()
235236

236237
if self._enable_address_index:
237238
indexes.enable_address_index(pubsub)
@@ -263,7 +264,7 @@ def build(self) -> BuildArtifacts:
263264
wallet=wallet,
264265
rng=self._rng,
265266
checkpoints=self._checkpoints,
266-
capabilities=self._capabilities,
267+
capabilities=capabilities,
267268
environment_info=get_environment_info(self._cmdline, str(peer.id)),
268269
bit_signaling_service=bit_signaling_service,
269270
verification_service=verification_service,
@@ -642,6 +643,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:
642643

643644
return self._poa_block_producer
644645

646+
def _get_or_create_capabilities(self) -> list[str]:
647+
if self._capabilities is None:
648+
settings = self._get_or_create_settings()
649+
self._capabilities = settings.get_default_capabilities()
650+
651+
return self._capabilities
652+
645653
def use_memory(self) -> 'Builder':
646654
self.check_if_can_modify()
647655
self._storage_type = StorageType.MEMORY

hathor/builder/cli_builder.py

+2
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
317317
)
318318

319319
cpu_mining_service = CpuMiningService()
320+
capabilities = settings.get_default_capabilities()
320321

321322
p2p_manager = ConnectionsManager(
322323
settings=settings,
@@ -384,6 +385,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
384385
vertex_handler=vertex_handler,
385386
vertex_parser=vertex_parser,
386387
poa_block_producer=poa_block_producer,
388+
capabilities=capabilities,
387389
)
388390

389391
if self._args.x_ipython_kernel:

hathor/conf/settings.py

+8
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,14 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
457457
validators=_VALIDATORS
458458
)
459459

460+
def get_default_capabilities(self) -> list[str]:
461+
"""Return the default capabilities."""
462+
return [
463+
self.CAPABILITY_WHITELIST,
464+
self.CAPABILITY_SYNC_VERSION,
465+
self.CAPABILITY_GET_BEST_BLOCKCHAIN
466+
]
467+
460468

461469
def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
462470
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""

hathor/manager.py

+2-13
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ def __init__(
109109
execution_manager: ExecutionManager,
110110
vertex_handler: VertexHandler,
111111
vertex_parser: VertexParser,
112+
capabilities: list[str],
112113
hostname: Optional[str] = None,
113114
wallet: Optional[BaseWallet] = None,
114-
capabilities: Optional[list[str]] = None,
115115
checkpoints: Optional[list[Checkpoint]] = None,
116116
rng: Optional[Random] = None,
117117
environment_info: Optional[EnvironmentInfo] = None,
@@ -231,10 +231,7 @@ def __init__(
231231
self.peers_whitelist: list[PeerId] = []
232232

233233
# List of capabilities of the peer
234-
if capabilities is not None:
235-
self.capabilities = capabilities
236-
else:
237-
self.capabilities = self.get_default_capabilities()
234+
self.capabilities = capabilities
238235

239236
# This is included in some logs to provide more context
240237
self.environment_info = environment_info
@@ -246,14 +243,6 @@ def __init__(
246243
self.lc_check_sync_state.clock = self.reactor
247244
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL
248245

249-
def get_default_capabilities(self) -> list[str]:
250-
"""Return the default capabilities for this manager."""
251-
return [
252-
self._settings.CAPABILITY_WHITELIST,
253-
self._settings.CAPABILITY_SYNC_VERSION,
254-
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN
255-
]
256-
257246
def start(self) -> None:
258247
""" A factory must be started only once. And it is usually automatically started.
259248
"""

hathor/metrics.py

+11-9
Original file line numberDiff line numberDiff line change
@@ -248,23 +248,25 @@ def collect_peer_connection_metrics(self) -> None:
248248
self.peer_connection_metrics.clear()
249249

250250
for connection in self.connections.get_connected_peers():
251-
if not connection._peer:
251+
peer = connection.get_peer_if_set()
252+
if not peer:
252253
# A connection without peer will not be able to communicate
253254
# So we can just discard it for the sake of the metrics
254255
continue
255256

257+
metrics = connection.get_metrics()
256258
metric = PeerConnectionMetrics(
257259
connection_string=str(connection.addr),
258260
peer_id=str(connection.peer.id),
259261
network=settings.NETWORK_NAME,
260-
received_messages=connection.metrics.received_messages,
261-
sent_messages=connection.metrics.sent_messages,
262-
received_bytes=connection.metrics.received_bytes,
263-
sent_bytes=connection.metrics.sent_bytes,
264-
received_txs=connection.metrics.received_txs,
265-
discarded_txs=connection.metrics.discarded_txs,
266-
received_blocks=connection.metrics.received_blocks,
267-
discarded_blocks=connection.metrics.discarded_blocks,
262+
received_messages=metrics.received_messages,
263+
sent_messages=metrics.sent_messages,
264+
received_bytes=metrics.received_bytes,
265+
sent_bytes=metrics.sent_bytes,
266+
received_txs=metrics.received_txs,
267+
discarded_txs=metrics.discarded_txs,
268+
received_blocks=metrics.received_blocks,
269+
discarded_blocks=metrics.discarded_blocks,
268270
)
269271

270272
self.peer_connection_metrics.append(metric)

hathor/p2p/manager.py

+21-12
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,7 @@ def has_synced_peer(self) -> bool:
322322
"""
323323
connections = list(self.iter_ready_connections())
324324
for conn in connections:
325-
assert conn.state is not None
326-
assert isinstance(conn.state, ReadyState)
327-
if conn.state.is_synced():
325+
if conn.is_synced():
328326
return True
329327
return False
330328

@@ -341,9 +339,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
341339
connections = list(self.iter_ready_connections())
342340
self.rng.shuffle(connections)
343341
for conn in connections:
344-
assert conn.state is not None
345-
assert isinstance(conn.state, ReadyState)
346-
conn.state.send_tx_to_peer(tx)
342+
conn.send_tx_to_peer(tx)
347343

348344
def disconnect_all_peers(self, *, force: bool = False) -> None:
349345
"""Disconnect all peers."""
@@ -408,10 +404,9 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
408404
def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
409405
"""Relay peer to all ready connections."""
410406
for conn in self.iter_ready_connections():
411-
if conn.peer == peer:
407+
if conn.get_peer() == peer:
412408
continue
413-
assert isinstance(conn.state, ReadyState)
414-
conn.state.send_peers([peer])
409+
conn.send_peers([peer])
415410

416411
def on_handshake_disconnect(self, *, addr: PeerAddress) -> None:
417412
"""Called when a peer disconnects from a handshaking state (HELLO or PEER-ID)."""
@@ -666,9 +661,9 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add
666661
def drop_connection(self, protocol: HathorProtocol) -> None:
667662
""" Drop a connection
668663
"""
669-
assert protocol.peer is not None
670-
self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__)
671-
protocol.send_error_and_close_connection('Connection dropped')
664+
protocol_peer = protocol.get_peer()
665+
self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__)
666+
protocol.send_error_and_close_connection('Connection droped')
672667

673668
def drop_connection_by_peer_id(self, peer_id: PeerId) -> None:
674669
""" Drop a connection by peer id
@@ -767,3 +762,17 @@ def reload_entrypoints_and_connections(self) -> None:
767762
self.log.warn('Killing all connections and resetting entrypoints...')
768763
self.disconnect_all_peers(force=True)
769764
self.my_peer.reload_entrypoints_from_source_file()
765+
766+
def get_peers_whitelist(self) -> list[PeerId]:
767+
assert self.manager is not None
768+
return self.manager.peers_whitelist
769+
770+
def get_verified_peers(self) -> Iterable[PublicPeer]:
771+
return self.verified_peer_storage.values()
772+
773+
def get_randbytes(self, n: int) -> bytes:
774+
return self.rng.randbytes(n)
775+
776+
def is_peer_whitelisted(self, peer_id: PeerId) -> bool:
777+
assert self.manager is not None
778+
return peer_id in self.manager.peers_whitelist

hathor/p2p/protocol.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import time
1616
from enum import Enum
17-
from typing import TYPE_CHECKING, Optional, cast
17+
from typing import TYPE_CHECKING, Iterable, Optional, cast
1818

1919
from structlog import get_logger
2020
from twisted.internet import defer
@@ -34,6 +34,7 @@
3434
from hathor.p2p.sync_version import SyncVersion
3535
from hathor.p2p.utils import format_address
3636
from hathor.profiler import get_cpu_profiler
37+
from hathor.transaction import BaseTransaction
3738

3839
if TYPE_CHECKING:
3940
from hathor.manager import HathorManager # noqa: F401
@@ -410,6 +411,27 @@ def disable_sync(self) -> None:
410411
self.log.info('disable sync')
411412
self.state.sync_agent.disable_sync()
412413

414+
def is_synced(self) -> bool:
415+
assert isinstance(self.state, ReadyState)
416+
return self.state.is_synced()
417+
418+
def send_tx_to_peer(self, tx: BaseTransaction) -> None:
419+
assert isinstance(self.state, ReadyState)
420+
return self.state.send_tx_to_peer(tx)
421+
422+
def get_peer(self) -> PublicPeer:
423+
return self.peer
424+
425+
def get_peer_if_set(self) -> PublicPeer | None:
426+
return self._peer
427+
428+
def send_peers(self, peers: Iterable[PublicPeer]) -> None:
429+
assert isinstance(self.state, ReadyState)
430+
self.state.send_peers(peers)
431+
432+
def get_metrics(self) -> 'ConnectionMetrics':
433+
return self.metrics
434+
413435

414436
class HathorLineReceiver(LineReceiver, HathorProtocol):
415437
""" Implements HathorProtocol in a LineReceiver protocol.

hathor/p2p/states/peer_id.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:
146146
147147
Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
148148
"""
149-
peer_is_whitelisted = peer_id in self.protocol.node.peers_whitelist
149+
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
150150
# never block whitelisted peers
151151
if peer_is_whitelisted:
152152
return False

hathor/p2p/states/ready.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def handle_get_peers(self, payload: str) -> None:
155155
""" Executed when a GET-PEERS command is received. It just responds with
156156
a list of all known peers.
157157
"""
158-
for peer in self.protocol.connections.verified_peer_storage.values():
158+
for peer in self.protocol.connections.get_verified_peers():
159159
self.send_peers([peer])
160160

161161
def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
@@ -195,8 +195,7 @@ def send_ping(self) -> None:
195195
"""
196196
# Add a salt number to prevent peers from faking rtt.
197197
self.ping_start_time = self.reactor.seconds()
198-
rng = self.protocol.connections.rng
199-
self.ping_salt = rng.randbytes(self.ping_salt_size).hex()
198+
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
200199
self.send_message(ProtocolMessages.PING, self.ping_salt)
201200

202201
def send_pong(self, salt: str) -> None:

hathor/p2p/sync_v2/agent.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient
4141
from hathor.reactor import ReactorProtocol as Reactor
4242
from hathor.transaction import BaseTransaction, Block, Transaction
43+
from hathor.transaction.genesis import is_genesis
4344
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
4445
from hathor.transaction.vertex_parser import VertexParser
4546
from hathor.types import VertexId
@@ -381,7 +382,7 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
381382
# Not synced but same blockchain?
382383
if self.peer_best_block.height <= my_best_block.height:
383384
# Is peer behind me at the same blockchain?
384-
common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height)
385+
common_block_hash = self.tx_storage.get_block_id_by_height(self.peer_best_block.height)
385386
if common_block_hash == self.peer_best_block.id:
386387
# If yes, nothing to sync from this peer.
387388
if not self.is_synced():
@@ -459,15 +460,13 @@ def send_get_tips(self) -> None:
459460
def handle_get_tips(self, _payload: str) -> None:
460461
""" Handle a GET-TIPS message.
461462
"""
462-
assert self.tx_storage.indexes is not None
463-
assert self.tx_storage.indexes.mempool_tips is not None
464463
if self._is_streaming:
465464
self.log.warn('can\'t send while streaming') # XXX: or can we?
466465
self.send_message(ProtocolMessages.MEMPOOL_END)
467466
return
468467
self.log.debug('handle_get_tips')
469468
# TODO Use a streaming of tips
470-
for tx_id in self.tx_storage.indexes.mempool_tips.get():
469+
for tx_id in self.tx_storage.get_mempool_tips():
471470
self.send_tips(tx_id)
472471
self.log.debug('tips end')
473472
self.send_message(ProtocolMessages.TIPS_END)
@@ -643,15 +642,14 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None:
643642
def handle_get_peer_block_hashes(self, payload: str) -> None:
644643
""" Handle a GET-PEER-BLOCK-HASHES message.
645644
"""
646-
assert self.tx_storage.indexes is not None
647645
heights = json.loads(payload)
648646
if len(heights) > 20:
649647
self.log.info('too many heights', heights_qty=len(heights))
650648
self.protocol.send_error_and_close_connection('GET-PEER-BLOCK-HASHES: too many heights')
651649
return
652650
data = []
653651
for h in heights:
654-
blk_hash = self.tx_storage.indexes.height.get(h)
652+
blk_hash = self.tx_storage.get_block_id_by_height(h)
655653
if blk_hash is None:
656654
break
657655
blk = self.tx_storage.get_transaction(blk_hash)
@@ -1152,7 +1150,7 @@ def handle_data(self, payload: str) -> None:
11521150
return
11531151

11541152
assert tx is not None
1155-
if self.protocol.node.tx_storage.get_genesis(tx.hash):
1153+
if is_genesis(tx.hash, settings=self._settings):
11561154
# We just got the data of a genesis tx/block. What should we do?
11571155
# Will it reduce peer reputation score?
11581156
return

hathor/p2p/sync_v2/blockchain_streaming_client.py

-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def handle_blocks(self, blk: Block) -> None:
102102
if self.tx_storage.partial_vertex_exists(blk.hash):
103103
# We reached a block we already have. Skip it.
104104
self._blk_repeated += 1
105-
is_duplicated = True
106105
if self._blk_repeated > self.max_repeated_blocks:
107106
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
108107
self.fails(TooManyRepeatedVerticesError())

hathor/transaction/storage/transaction_storage.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -533,12 +533,14 @@ def get_transaction(self, hash_bytes: bytes) -> BaseTransaction:
533533
self.post_get_validation(tx)
534534
return tx
535535

536-
def get_block_by_height(self, height: int) -> Optional[Block]:
537-
"""Return a block in the best blockchain from the height index. This is fast."""
536+
def get_block_id_by_height(self, height: int) -> VertexId | None:
538537
assert self.indexes is not None
539-
ancestor_hash = self.indexes.height.get(height)
538+
return self.indexes.height.get(height)
540539

541-
return None if ancestor_hash is None else self.get_block(ancestor_hash)
540+
def get_block_by_height(self, height: int) -> Optional[Block]:
541+
"""Return a block in the best blockchain from the height index. This is fast."""
542+
block_id = self.get_block_id_by_height(height)
543+
return None if block_id is None else self.get_block(block_id)
542544

543545
def get_metadata(self, hash_bytes: bytes) -> Optional[TransactionMetadata]:
544546
"""Returns the transaction metadata with hash `hash_bytes`.
@@ -1137,6 +1139,11 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
11371139
with self.allow_partially_validated_context():
11381140
return self.transaction_exists(vertex_id)
11391141

1142+
def get_mempool_tips(self) -> set[VertexId]:
1143+
assert self.indexes is not None
1144+
assert self.indexes.mempool_tips is not None
1145+
return self.indexes.mempool_tips.get()
1146+
11401147

11411148
class BaseTransactionStorage(TransactionStorage):
11421149
indexes: Optional[IndexesManager]

tests/p2p/test_get_best_blockchain.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def test_node_without_get_best_blockchain_capability(self) -> None:
224224
protocol2 = connected_peers1[0]
225225
self.assertTrue(protocol2.capabilities.issuperset(set(cababilities_without_get_best_blockchain)))
226226
protocol1 = connected_peers2[0]
227-
default_capabilities = manager2.get_default_capabilities()
227+
default_capabilities = self._settings.get_default_capabilities()
228228
self.assertTrue(protocol1.capabilities.issuperset(set(default_capabilities)))
229229

230230
# assert the peers don't engage in get_best_blockchain messages

0 commit comments

Comments
 (0)