Skip to content

Commit b9abf07

Browse files
committed
feat(p2p): add ability to update peer_id.json with SIGUSR1
1 parent 23f764d commit b9abf07

File tree

13 files changed

+311
-71
lines changed

13 files changed

+311
-71
lines changed

hathor/builder/cli_builder.py

+2-12
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414

1515
import getpass
16-
import json
1716
import os
1817
import platform
1918
import sys
@@ -96,8 +95,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
9695
self.log = logger.new()
9796
self.reactor = reactor
9897

99-
peer_id = self.create_peer_id()
100-
98+
peer_id = PeerId.create_from_json_path(self._args.peer) if self._args.peer else PeerId()
10199
python = f'{platform.python_version()}-{platform.python_implementation()}'
102100

103101
self.log.info(
@@ -367,7 +365,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
367365
self.log.warn('--memory-indexes is implied for memory storage or JSON storage')
368366

369367
for description in self._args.listen:
370-
p2p_manager.add_listen_address(description)
368+
p2p_manager.add_listen_address_description(description)
371369

372370
if self._args.peer_id_blacklist:
373371
self.log.info('with peer id blacklist', blacklist=self._args.peer_id_blacklist)
@@ -397,14 +395,6 @@ def get_hostname(self) -> Optional[str]:
397395
print('Hostname discovered and set to {}'.format(hostname))
398396
return hostname
399397

400-
def create_peer_id(self) -> PeerId:
401-
if not self._args.peer:
402-
peer_id = PeerId()
403-
else:
404-
data = json.load(open(self._args.peer, 'r'))
405-
peer_id = PeerId.create_from_json(data)
406-
return peer_id
407-
408398
def create_wallet(self) -> BaseWallet:
409399
if self._args.wallet == 'hd':
410400
kwargs: dict[str, Any] = {

hathor/cli/run_node.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,8 @@ def register_signal_handlers(self) -> None:
265265
def signal_usr1_handler(self, sig: int, frame: Any) -> None:
266266
"""Called when USR1 signal is received."""
267267
try:
268-
self.log.warn('USR1 received. Killing all connections...')
269-
if self.manager and self.manager.connections:
270-
self.manager.connections.disconnect_all_peers(force=True)
268+
self.log.warn('USR1 received.')
269+
self.manager.connections.reload_entrypoints_and_connections()
271270
except Exception:
272271
# see: https://docs.python.org/3/library/signal.html#note-on-signal-handlers-and-exceptions
273272
self.log.error('prevented exception from escaping the signal handler', exc_info=True)

hathor/conf/settings.py

+4
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,10 @@ def GENESIS_TX2_TIMESTAMP(self) -> int:
423423
OLD_MAX_MERKLE_PATH_LENGTH: int = 12
424424
NEW_MAX_MERKLE_PATH_LENGTH: int = 20
425425

426+
# Maximum number of tx tips to accept in the initial phase of the mempool sync 1000 is arbitrary, but it should be
427+
# more than enough for the forseeable future
428+
MAX_MEMPOOL_RECEIVING_TIPS: int = 1000
429+
426430
# Used to enable nano contracts.
427431
#
428432
# This should NEVER be enabled for mainnet and testnet, since both networks will

hathor/manager.py

+34-25
Original file line numberDiff line numberDiff line change
@@ -89,31 +89,33 @@ class UnhealthinessReason(str, Enum):
8989
# This is the interval to be used by the task to check if the node is synced
9090
CHECK_SYNC_STATE_INTERVAL = 30 # seconds
9191

92-
def __init__(self,
93-
reactor: Reactor,
94-
*,
95-
settings: HathorSettings,
96-
pubsub: PubSubManager,
97-
consensus_algorithm: ConsensusAlgorithm,
98-
daa: DifficultyAdjustmentAlgorithm,
99-
peer_id: PeerId,
100-
tx_storage: TransactionStorage,
101-
p2p_manager: ConnectionsManager,
102-
event_manager: EventManager,
103-
feature_service: FeatureService,
104-
bit_signaling_service: BitSignalingService,
105-
verification_service: VerificationService,
106-
cpu_mining_service: CpuMiningService,
107-
network: str,
108-
execution_manager: ExecutionManager,
109-
hostname: Optional[str] = None,
110-
wallet: Optional[BaseWallet] = None,
111-
capabilities: Optional[list[str]] = None,
112-
checkpoints: Optional[list[Checkpoint]] = None,
113-
rng: Optional[Random] = None,
114-
environment_info: Optional[EnvironmentInfo] = None,
115-
full_verification: bool = False,
116-
enable_event_queue: bool = False):
92+
def __init__(
93+
self,
94+
reactor: Reactor,
95+
*,
96+
settings: HathorSettings,
97+
pubsub: PubSubManager,
98+
consensus_algorithm: ConsensusAlgorithm,
99+
daa: DifficultyAdjustmentAlgorithm,
100+
peer_id: PeerId,
101+
tx_storage: TransactionStorage,
102+
p2p_manager: ConnectionsManager,
103+
event_manager: EventManager,
104+
feature_service: FeatureService,
105+
bit_signaling_service: BitSignalingService,
106+
verification_service: VerificationService,
107+
cpu_mining_service: CpuMiningService,
108+
network: str,
109+
execution_manager: ExecutionManager,
110+
hostname: Optional[str] = None,
111+
wallet: Optional[BaseWallet] = None,
112+
capabilities: Optional[list[str]] = None,
113+
checkpoints: Optional[list[Checkpoint]] = None,
114+
rng: Optional[Random] = None,
115+
environment_info: Optional[EnvironmentInfo] = None,
116+
full_verification: bool = False,
117+
enable_event_queue: bool = False,
118+
) -> None:
117119
"""
118120
:param reactor: Twisted reactor which handles the mainloop and the events.
119121
:param peer_id: Id of this node.
@@ -1173,6 +1175,13 @@ def get_cmd_path(self) -> Optional[str]:
11731175
"""Return the cmd path. If no cmd path is set, returns None."""
11741176
return self._cmd_path
11751177

1178+
def set_hostname_and_reset_connections(self, new_hostname: str) -> None:
1179+
"""Set the hostname and reset all connections."""
1180+
old_hostname = self.hostname
1181+
self.hostname = new_hostname
1182+
self.connections.update_hostname_entrypoints(old_hostname=old_hostname, new_hostname=self.hostname)
1183+
self.connections.disconnect_all_peers(force=True)
1184+
11761185

11771186
class ParentTxs(NamedTuple):
11781187
""" Tuple where the `must_include` hash, when present (at most 1), must be included in a pair, and a list of hashes

hathor/p2p/manager.py

+51-18
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616

1717
from structlog import get_logger
1818
from twisted.internet import endpoints
19+
from twisted.internet.address import IPv4Address, IPv6Address
1920
from twisted.internet.defer import Deferred
20-
from twisted.internet.interfaces import IProtocolFactory, IStreamClientEndpoint, IStreamServerEndpoint
21+
from twisted.internet.interfaces import IListeningPort, IProtocolFactory, IStreamClientEndpoint
2122
from twisted.internet.task import LoopingCall
2223
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
2324
from twisted.python.failure import Failure
@@ -108,8 +109,11 @@ def __init__(self,
108109

109110
self.network = network
110111

111-
# List of addresses to listen for new connections (eg: [tcp:8000])
112-
self.listen_addresses: list[str] = []
112+
# List of address descriptions to listen for new connections (eg: [tcp:8000])
113+
self.listen_address_descriptions: list[str] = []
114+
115+
# List of actual IP address instances to listen for new connections
116+
self._listen_addresses: list[IPv4Address | IPv6Address] = []
113117

114118
# List of peer discovery methods.
115119
self.peer_discoveries: list[PeerDiscovery] = []
@@ -239,9 +243,9 @@ def set_manager(self, manager: 'HathorManager') -> None:
239243
self.log.debug('enable sync-v2 indexes')
240244
indexes.enable_mempool_index()
241245

242-
def add_listen_address(self, addr: str) -> None:
246+
def add_listen_address_description(self, addr: str) -> None:
243247
"""Add address to listen for incoming connections."""
244-
self.listen_addresses.append(addr)
248+
self.listen_address_descriptions.append(addr)
245249

246250
def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None:
247251
"""Add a peer discovery method."""
@@ -279,7 +283,7 @@ def start(self) -> None:
279283
if self._settings.ENABLE_PEER_WHITELIST:
280284
self._start_whitelist_reconnect()
281285

282-
for description in self.listen_addresses:
286+
for description in self.listen_address_descriptions:
283287
self.listen(description)
284288

285289
self.do_discovery()
@@ -635,7 +639,7 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O
635639
peers_count=self._get_peers_count()
636640
)
637641

638-
def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamServerEndpoint:
642+
def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
639643
""" Start to listen for new connection according to the description.
640644
641645
If `ssl` is True, then the connection will be wraped by a TLS.
@@ -661,20 +665,43 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamSer
661665

662666
factory = NetfilterFactory(self, factory)
663667

664-
self.log.info('listen on', endpoint=description)
665-
endpoint.listen(factory)
668+
self.log.info('trying to listen on', endpoint=description)
669+
deferred: Deferred[IListeningPort] = endpoint.listen(factory)
670+
deferred.addCallback(self._on_listen_success, description)
671+
672+
def _on_listen_success(self, listening_port: IListeningPort, description: str) -> None:
673+
"""Callback to be called when listening to an endpoint succeeds."""
674+
self.log.info('success listening on', endpoint=description)
675+
address = listening_port.getHost()
676+
677+
if not isinstance(address, (IPv4Address, IPv6Address)):
678+
self.log.error(f'unhandled address type for endpoint "{description}": {str(type(address))}')
679+
return
680+
681+
self._listen_addresses.append(address)
666682

667-
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases
668-
# that we have will have a _port attribute
669-
port = getattr(endpoint, '_port', None)
670683
assert self.manager is not None
671-
if self.manager.hostname and port is not None:
672-
proto, _, _ = description.partition(':')
673-
address = '{}://{}:{}'.format(proto, self.manager.hostname, port)
674-
assert self.manager.my_peer is not None
675-
self.manager.my_peer.entrypoints.append(address)
684+
if self.manager.hostname:
685+
self._add_hostname_entrypoint(self.manager.hostname, address)
676686

677-
return endpoint
687+
def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname: str) -> None:
688+
"""Add new hostname entrypoints according to the listen addresses, and remove any old entrypoint."""
689+
assert self.manager is not None
690+
for address in self._listen_addresses:
691+
if old_hostname is not None:
692+
old_address_str = self._get_hostname_address_str(old_hostname, address)
693+
if old_address_str in self.my_peer.entrypoints:
694+
self.my_peer.entrypoints.remove(old_address_str)
695+
696+
self._add_hostname_entrypoint(new_hostname, address)
697+
698+
def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None:
699+
hostname_address_str = self._get_hostname_address_str(hostname, address)
700+
self.my_peer.entrypoints.append(hostname_address_str)
701+
702+
@staticmethod
703+
def _get_hostname_address_str(hostname: str, address: IPv4Address | IPv6Address) -> str:
704+
return '{}://{}:{}'.format(address.type, hostname, address.port).lower()
678705

679706
def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
680707
""" When there are duplicate connections, determine which one should be dropped.
@@ -796,3 +823,9 @@ def _sync_rotate_if_needed(self, *, force: bool = False) -> None:
796823

797824
for peer_id in info.to_enable:
798825
self.connected_peers[peer_id].enable_sync()
826+
827+
def reload_entrypoints_and_connections(self) -> None:
828+
"""Kill all connections and reload entrypoints from the original peer config file."""
829+
self.log.warn('Killing all connections and resetting entrypoints...')
830+
self.disconnect_all_peers(force=True)
831+
self.my_peer.reload_entrypoints_from_source_file()

hathor/p2p/peer_id.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import base64
1616
import hashlib
17+
import json
1718
from enum import Enum
1819
from math import inf
1920
from typing import TYPE_CHECKING, Any, Optional, cast
@@ -24,6 +25,7 @@
2425
from cryptography.hazmat.primitives import hashes, serialization
2526
from cryptography.hazmat.primitives.asymmetric import padding, rsa
2627
from OpenSSL.crypto import X509, PKey
28+
from structlog import get_logger
2729
from twisted.internet.interfaces import ISSLTransport
2830
from twisted.internet.ssl import Certificate, CertificateOptions, TLSVersion, trustRootFromCertificates
2931

@@ -35,6 +37,8 @@
3537
if TYPE_CHECKING:
3638
from hathor.p2p.protocol import HathorProtocol # noqa: F401
3739

40+
logger = get_logger()
41+
3842

3943
class InvalidPeerIdException(Exception):
4044
pass
@@ -64,8 +68,10 @@ class PeerId:
6468
retry_attempts: int # how many retries were made
6569
last_seen: float # last time this peer was seen
6670
flags: set[str]
71+
source_file: str | None
6772

6873
def __init__(self, auto_generate_keys: bool = True) -> None:
74+
self._log = logger.new()
6975
self._settings = get_global_settings()
7076
self.id = None
7177
self.private_key = None
@@ -159,9 +165,15 @@ def verify_signature(self, signature: bytes, data: bytes) -> bool:
159165
else:
160166
return True
161167

168+
@classmethod
169+
def create_from_json_path(cls, path: str) -> 'PeerId':
170+
"""Create a new PeerId from a JSON file."""
171+
data = json.load(open(path, 'r'))
172+
return PeerId.create_from_json(data)
173+
162174
@classmethod
163175
def create_from_json(cls, data: dict[str, Any]) -> 'PeerId':
164-
""" Create a new PeerId from a JSON.
176+
""" Create a new PeerId from JSON data.
165177
166178
It is used both to load a PeerId from disk and to create a PeerId
167179
from a peer connection.
@@ -408,3 +420,20 @@ def validate_certificate(self, protocol: 'HathorProtocol') -> bool:
408420
return False
409421

410422
return True
423+
424+
def reload_entrypoints_from_source_file(self) -> None:
425+
"""Update this PeerId's entrypoints from the json file."""
426+
if not self.source_file:
427+
raise Exception('Trying to reload entrypoints but no peer config file was provided.')
428+
429+
new_peer_id = PeerId.create_from_json_path(self.source_file)
430+
431+
if new_peer_id.id != self.id:
432+
self._log.error(
433+
'Ignoring peer id file update because the peer_id does not match.',
434+
current_peer_id=self.id,
435+
new_peer_id=new_peer_id.id,
436+
)
437+
return
438+
439+
self.entrypoints = new_peer_id.entrypoints

hathor/p2p/sync_v2/agent.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None:
142142
# Saves if I am in the middle of a mempool sync
143143
# we don't execute any sync while in the middle of it
144144
self.mempool_manager = SyncMempoolManager(self)
145-
self._receiving_tips: Optional[list[bytes]] = None
145+
self._receiving_tips: Optional[list[VertexId]] = None
146+
self.max_receiving_tips: int = self._settings.MAX_MEMPOOL_RECEIVING_TIPS
146147

147148
# Cache for get_tx calls
148149
self._get_tx_cache: OrderedDict[bytes, BaseTransaction] = OrderedDict()
@@ -476,7 +477,13 @@ def handle_tips(self, payload: str) -> None:
476477
data = json.loads(payload)
477478
data = [bytes.fromhex(x) for x in data]
478479
# filter-out txs we already have
479-
self._receiving_tips.extend(tx_id for tx_id in data if not self.partial_vertex_exists(tx_id))
480+
try:
481+
self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id))
482+
except ValueError:
483+
self.protocol.send_error_and_close_connection('Invalid trasaction ID received')
484+
# XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol
485+
if len(self._receiving_tips) > self.max_receiving_tips:
486+
self.protocol.send_error_and_close_connection(f'Too many tips: {len(self._receiving_tips)}')
480487

481488
def handle_tips_end(self, _payload: str) -> None:
482489
""" Handle a TIPS-END message.

hathor/p2p/utils.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,18 @@
3333
from hathor.transaction.genesis import get_representation_for_all_genesis
3434

3535

36-
def discover_hostname() -> Optional[str]:
37-
""" Try to discover your hostname. It is a synchonous operation and
36+
def discover_hostname(timeout: float | None = None) -> Optional[str]:
37+
""" Try to discover your hostname. It is a synchronous operation and
3838
should not be called from twisted main loop.
3939
"""
40-
return discover_ip_ipify()
40+
return discover_ip_ipify(timeout)
4141

4242

43-
def discover_ip_ipify() -> Optional[str]:
43+
def discover_ip_ipify(timeout: float | None = None) -> Optional[str]:
4444
""" Try to discover your IP address using ipify's api.
45-
It is a synchonous operation and should not be called from twisted main loop.
45+
It is a synchronous operation and should not be called from twisted main loop.
4646
"""
47-
response = requests.get('https://api.ipify.org')
47+
response = requests.get('https://api.ipify.org', timeout=timeout)
4848
if response.ok:
4949
# It may be either an ipv4 or ipv6 in string format.
5050
ip = response.text

0 commit comments

Comments
 (0)