Skip to content

Commit 501ce63

Browse files
committed
feat(p2p): Run periodic bootstrap discovery
1 parent 1caa136 commit 501ce63

File tree

3 files changed

+46
-41
lines changed

3 files changed

+46
-41
lines changed

hathor/builder/cli_builder.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,10 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
258258
dns_hosts.extend(self._args.dns)
259259

260260
if dns_hosts:
261-
self.manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))
261+
p2p_manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))
262262

263263
if self._args.bootstrap:
264-
self.manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap))
264+
p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap))
265265

266266
if self._args.test_mode_tx_weight:
267267
_set_test_mode(TestMode.TEST_TX_WEIGHT)
@@ -277,7 +277,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
277277
self.log.warn('--memory-indexes is implied for memory storage or JSON storage')
278278

279279
for description in self._args.listen:
280-
self.manager.add_listen_address(description)
280+
p2p_manager.add_listen_address(description)
281281

282282
if self._args.peer_id_blacklist:
283283
self.log.info('with peer id blacklist', blacklist=self._args.peer_id_blacklist)

hathor/manager.py

+1-35
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
from hathor.feature_activation.feature_service import FeatureService
4545
from hathor.mining import BlockTemplate, BlockTemplates
4646
from hathor.p2p.manager import ConnectionsManager
47-
from hathor.p2p.peer_discovery import PeerDiscovery
4847
from hathor.p2p.peer_id import PeerId
4948
from hathor.p2p.protocol import HathorProtocol
5049
from hathor.profiler import get_cpu_profiler
@@ -180,8 +179,6 @@ def __init__(self,
180179

181180
self.consensus_algorithm = consensus_algorithm
182181

183-
self.peer_discoveries: list[PeerDiscovery] = []
184-
185182
self.connections = p2p_manager
186183

187184
self.metrics = Metrics(
@@ -206,9 +203,6 @@ def __init__(self,
206203
# Thread pool used to resolve pow when sending tokens
207204
self.pow_thread_pool = ThreadPool(minthreads=0, maxthreads=settings.MAX_POW_THREADS, name='Pow thread pool')
208205

209-
# List of addresses to listen for new connections (eg: [tcp:8000])
210-
self.listen_addresses: list[str] = []
211-
212206
# Full verification execute all validations for transactions and blocks when initializing the node
213207
# Can be activated on the command line with --full-verification
214208
self._full_verification = full_verification
@@ -274,7 +268,6 @@ def start(self) -> None:
274268
self.state = self.NodeState.INITIALIZING
275269
self.pubsub.publish(HathorEvents.MANAGER_ON_START)
276270
self._event_manager.load_started()
277-
self.connections.start()
278271
self.pow_thread_pool.start()
279272

280273
# Disable get transaction lock when initializing components
@@ -297,10 +290,7 @@ def start(self) -> None:
297290
# Metric starts to capture data
298291
self.metrics.start()
299292

300-
for description in self.listen_addresses:
301-
self.listen(description)
302-
303-
self.do_discovery()
293+
self.connections.start()
304294

305295
self.start_time = time.time()
306296

@@ -350,13 +340,6 @@ def stop(self) -> Deferred:
350340

351341
return defer.DeferredList(waits)
352342

353-
def do_discovery(self) -> None:
354-
"""
355-
Do a discovery and connect on all discovery strategies.
356-
"""
357-
for peer_discovery in self.peer_discoveries:
358-
peer_discovery.discover_and_connect(self.connections.connect_to)
359-
360343
def start_profiler(self, *, reset: bool = False) -> None:
361344
"""
362345
Start profiler. It can be activated from a web resource, as well.
@@ -686,12 +669,6 @@ def _sync_v2_resume_validations(self) -> None:
686669
self.sync_v2_step_validations(depended_final_txs, quiet=True)
687670
self.log.debug('pending validations finished')
688671

689-
def add_listen_address(self, addr: str) -> None:
690-
self.listen_addresses.append(addr)
691-
692-
def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None:
693-
self.peer_discoveries.append(peer_discovery)
694-
695672
def get_new_tx_parents(self, timestamp: Optional[float] = None) -> list[VertexId]:
696673
"""Select which transactions will be confirmed by a new transaction.
697674
@@ -1128,17 +1105,6 @@ def _log_if_feature_is_active(self, block: Block, feature: Feature) -> None:
11281105
if self._feature_service.is_feature_active(block=block, feature=feature):
11291106
self.log.info('Feature is ACTIVE for block', feature=feature.value, block_height=block.get_height())
11301107

1131-
def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
1132-
endpoint = self.connections.listen(description, use_ssl)
1133-
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases
1134-
# that we have will have a _port attribute
1135-
port = getattr(endpoint, '_port', None)
1136-
1137-
if self.hostname:
1138-
proto, _, _ = description.partition(':')
1139-
address = '{}://{}:{}'.format(proto, self.hostname, port)
1140-
self.my_peer.entrypoints.append(address)
1141-
11421108
def has_sync_version_capability(self) -> bool:
11431109
return settings.CAPABILITY_SYNC_VERSION in self.capabilities
11441110

hathor/p2p/manager.py

+42-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from hathor.conf import HathorSettings
2626
from hathor.p2p.netfilter.factory import NetfilterFactory
27+
from hathor.p2p.peer_discovery import PeerDiscovery
2728
from hathor.p2p.peer_id import PeerId
2829
from hathor.p2p.peer_storage import PeerStorage
2930
from hathor.p2p.protocol import HathorProtocol
@@ -114,6 +115,12 @@ def __init__(self,
114115

115116
self.network = network
116117

118+
# List of addresses to listen for new connections (eg: [tcp:8000])
119+
self.listen_addresses: list[str] = []
120+
121+
# List of peer discovery methods.
122+
self.peer_discoveries: list[PeerDiscovery] = []
123+
117124
# Options
118125
self.localhost_only = False
119126

@@ -202,6 +209,21 @@ def set_manager(self, manager: 'HathorManager') -> None:
202209
indexes.enable_deps_index()
203210
indexes.enable_mempool_index()
204211

212+
def add_listen_address(self, addr: str) -> None:
213+
"""Add address to listen for incoming connections."""
214+
self.listen_addresses.append(addr)
215+
216+
def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None:
217+
"""Add a peer discovery method."""
218+
self.peer_discoveries.append(peer_discovery)
219+
220+
def do_discovery(self) -> None:
221+
"""
222+
Do a discovery and connect on all discovery strategies.
223+
"""
224+
for peer_discovery in self.peer_discoveries:
225+
peer_discovery.discover_and_connect(self.connect_to)
226+
205227
def disable_rate_limiter(self) -> None:
206228
"""Disable global rate limiter."""
207229
self.rate_limiter.unset_limit(self.GlobalRateLimiter.SEND_TIPS)
@@ -217,9 +239,15 @@ def enable_rate_limiter(self, max_hits: int = 16, window_seconds: float = 1) ->
217239
def start(self) -> None:
218240
self.lc_reconnect.start(5, now=False)
219241
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)
242+
220243
if settings.ENABLE_PEER_WHITELIST:
221244
self._start_whitelist_reconnect()
222245

246+
for description in self.listen_addresses:
247+
self.listen(description)
248+
249+
self.do_discovery()
250+
223251
def _start_whitelist_reconnect(self) -> None:
224252
# The deferred returned by the LoopingCall start method
225253
# executes when the looping call stops running
@@ -442,7 +470,7 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) ->
442470
if peer.id == self.my_peer.id:
443471
return
444472
peer = self.received_peer_storage.add_or_merge(peer)
445-
self.connect_to_if_not_connected(peer, 0)
473+
self.connect_to_if_not_connected(peer, int(self.reactor.seconds()))
446474

447475
def peers_cleanup(self) -> None:
448476
"""Clean up aged peers."""
@@ -469,7 +497,7 @@ def reconnect_to_all(self) -> None:
469497
# when we have no connected peers left, run the discovery process again
470498
assert self.manager is not None
471499
if len(self.connected_peers) < 1:
472-
self.manager.do_discovery()
500+
self.do_discovery()
473501
now = int(self.reactor.seconds())
474502
# We need to use list() here because the dict might change inside connect_to_if_not_connected
475503
# when the peer is disconnected and without entrypoint
@@ -614,7 +642,7 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O
614642
)
615643

616644
def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamServerEndpoint:
617-
""" Start to listen to new connection according to the description.
645+
""" Start to listen for new connection according to the description.
618646
619647
If `ssl` is True, then the connection will be wraped by a TLS.
620648
@@ -641,6 +669,17 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamSer
641669

642670
self.log.info('listen on', endpoint=description)
643671
endpoint.listen(factory)
672+
673+
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases
674+
# that we have will have a _port attribute
675+
port = getattr(endpoint, '_port', None)
676+
assert self.manager is not None
677+
if self.manager.hostname and port is not None:
678+
proto, _, _ = description.partition(':')
679+
address = '{}://{}:{}'.format(proto, self.manager.hostname, port)
680+
assert self.manager.my_peer is not None
681+
self.manager.my_peer.entrypoints.append(address)
682+
644683
return endpoint
645684

646685
def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:

0 commit comments

Comments
 (0)