diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 1f2e6f7b1..cbf5b6857 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -262,10 +262,10 @@ def create_manager(self, reactor: Reactor) -> HathorManager: dns_hosts.extend(self._args.dns) if dns_hosts: - self.manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts)) + p2p_manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts)) if self._args.bootstrap: - self.manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap)) + p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap)) if self._args.test_mode_tx_weight: _set_test_mode(TestMode.TEST_TX_WEIGHT) @@ -281,7 +281,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager: self.log.warn('--memory-indexes is implied for memory storage or JSON storage') for description in self._args.listen: - self.manager.add_listen_address(description) + p2p_manager.add_listen_address(description) if self._args.peer_id_blacklist: self.log.info('with peer id blacklist', blacklist=self._args.peer_id_blacklist) diff --git a/hathor/conf/settings.py b/hathor/conf/settings.py index 884c4e8e0..f7af7faaa 100644 --- a/hathor/conf/settings.py +++ b/hathor/conf/settings.py @@ -392,6 +392,9 @@ def MAXIMUM_NUMBER_OF_HALVINGS(self) -> int: # Time to update the peers that are running sync. SYNC_UPDATE_INTERVAL: int = 10 * 60 # seconds + # Interval to re-run peer discovery. + PEER_DISCOVERY_INTERVAL: int = 5 * 60 # seconds + # All settings related to Feature Activation FEATURE_ACTIVATION: FeatureActivationSettings = FeatureActivationSettings() diff --git a/hathor/manager.py b/hathor/manager.py index 99f46cc5e..7ae37d01a 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -44,7 +44,6 @@ from hathor.feature_activation.feature_service import FeatureService from hathor.mining import BlockTemplate, BlockTemplates from hathor.p2p.manager import ConnectionsManager -from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_id import PeerId from hathor.p2p.protocol import HathorProtocol from hathor.profiler import get_cpu_profiler @@ -183,8 +182,6 @@ def __init__(self, self.consensus_algorithm = consensus_algorithm - self.peer_discoveries: list[PeerDiscovery] = [] - self.connections = p2p_manager self.metrics = Metrics( @@ -209,9 +206,6 @@ def __init__(self, # Thread pool used to resolve pow when sending tokens self.pow_thread_pool = ThreadPool(minthreads=0, maxthreads=settings.MAX_POW_THREADS, name='Pow thread pool') - # List of addresses to listen for new connections (eg: [tcp:8000]) - self.listen_addresses: list[str] = [] - # Full verification execute all validations for transactions and blocks when initializing the node # Can be activated on the command line with --full-verification self._full_verification = full_verification @@ -277,7 +271,6 @@ def start(self) -> None: self.state = self.NodeState.INITIALIZING self.pubsub.publish(HathorEvents.MANAGER_ON_START) self._event_manager.load_started() - self.connections.start() self.pow_thread_pool.start() # Disable get transaction lock when initializing components @@ -300,10 +293,7 @@ def start(self) -> None: # Metric starts to capture data self.metrics.start() - for description in self.listen_addresses: - self.listen(description) - - self.do_discovery() + self.connections.start() self.start_time = time.time() @@ -353,13 +343,6 @@ def stop(self) -> Deferred: return defer.DeferredList(waits) - def do_discovery(self) -> None: - """ - Do a discovery and connect on all discovery strategies. - """ - for peer_discovery in self.peer_discoveries: - peer_discovery.discover_and_connect(self.connections.connect_to) - def start_profiler(self, *, reset: bool = False) -> None: """ Start profiler. It can be activated from a web resource, as well. @@ -695,12 +678,6 @@ def _sync_v2_resume_validations(self) -> None: self.sync_v2_step_validations(depended_final_txs, quiet=True) self.log.debug('pending validations finished') - def add_listen_address(self, addr: str) -> None: - self.listen_addresses.append(addr) - - def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None: - self.peer_discoveries.append(peer_discovery) - def get_new_tx_parents(self, timestamp: Optional[float] = None) -> list[VertexId]: """Select which transactions will be confirmed by a new transaction. @@ -1141,17 +1118,6 @@ def _log_if_feature_is_active(self, block: Block, feature: Feature) -> None: if self._feature_service.is_feature_active(block=block, feature=feature): self.log.info('Feature is ACTIVE for block', feature=feature.value, block_height=block.get_height()) - def listen(self, description: str, use_ssl: Optional[bool] = None) -> None: - endpoint = self.connections.listen(description, use_ssl) - # XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases - # that we have will have a _port attribute - port = getattr(endpoint, '_port', None) - - if self.hostname: - proto, _, _ = description.partition(':') - address = '{}://{}:{}'.format(proto, self.hostname, port) - self.my_peer.entrypoints.append(address) - def has_sync_version_capability(self) -> bool: return settings.CAPABILITY_SYNC_VERSION in self.capabilities diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index 007851ff6..4e6639185 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -24,6 +24,7 @@ from hathor.conf import HathorSettings from hathor.p2p.netfilter.factory import NetfilterFactory +from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_id import PeerId from hathor.p2p.peer_storage import PeerStorage from hathor.p2p.protocol import HathorProtocol @@ -73,6 +74,7 @@ class ConnectionsManager: """ MAX_ENABLED_SYNC = settings.MAX_ENABLED_SYNC SYNC_UPDATE_INTERVAL = settings.SYNC_UPDATE_INTERVAL + PEER_DISCOVERY_INTERVAL = settings.PEER_DISCOVERY_INTERVAL class GlobalRateLimiter: SEND_TIPS = 'NodeSyncTimestamp.send_tips' @@ -114,6 +116,12 @@ def __init__(self, self.network = network + # List of addresses to listen for new connections (eg: [tcp:8000]) + self.listen_addresses: list[str] = [] + + # List of peer discovery methods. + self.peer_discoveries: list[PeerDiscovery] = [] + # Options self.localhost_only = False @@ -179,6 +187,9 @@ def __init__(self, self.enable_sync_v1_1 = enable_sync_v1_1 self.enable_sync_v2 = enable_sync_v2 + # Timestamp when the last discovery ran + self._last_discovery: float = 0. + # sync-manager factories self._sync_factories = {} if enable_sync_v1: @@ -198,6 +209,21 @@ def set_manager(self, manager: 'HathorManager') -> None: indexes.enable_deps_index() indexes.enable_mempool_index() + def add_listen_address(self, addr: str) -> None: + """Add address to listen for incoming connections.""" + self.listen_addresses.append(addr) + + def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None: + """Add a peer discovery method.""" + self.peer_discoveries.append(peer_discovery) + + def do_discovery(self) -> None: + """ + Do a discovery and connect on all discovery strategies. + """ + for peer_discovery in self.peer_discoveries: + peer_discovery.discover_and_connect(self.connect_to) + def disable_rate_limiter(self) -> None: """Disable global rate limiter.""" self.rate_limiter.unset_limit(self.GlobalRateLimiter.SEND_TIPS) @@ -213,9 +239,15 @@ def enable_rate_limiter(self, max_hits: int = 16, window_seconds: float = 1) -> def start(self) -> None: self.lc_reconnect.start(5, now=False) self.lc_sync_update.start(self.lc_sync_update_interval, now=False) + if settings.ENABLE_PEER_WHITELIST: self._start_whitelist_reconnect() + for description in self.listen_addresses: + self.listen(description) + + self.do_discovery() + def _start_whitelist_reconnect(self) -> None: # The deferred returned by the LoopingCall start method # executes when the looping call stops running @@ -424,7 +456,7 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) -> if peer.id == self.my_peer.id: return peer = self.received_peer_storage.add_or_merge(peer) - self.connect_to_if_not_connected(peer, 0) + self.connect_to_if_not_connected(peer, int(self.reactor.seconds())) def reconnect_to_all(self) -> None: """ It is called by the `lc_reconnect` timer and tries to connect to all known @@ -434,13 +466,14 @@ def reconnect_to_all(self) -> None: """ # when we have no connected peers left, run the discovery process again assert self.manager is not None - if len(self.connected_peers) < 1: - self.manager.do_discovery() - now = int(self.reactor.seconds()) + now = self.reactor.seconds() + if now - self._last_discovery >= self.PEER_DISCOVERY_INTERVAL: + self._last_discovery = now + self.do_discovery() # We need to use list() here because the dict might change inside connect_to_if_not_connected # when the peer is disconnected and without entrypoint for peer in list(self.peer_storage.values()): - self.connect_to_if_not_connected(peer, now) + self.connect_to_if_not_connected(peer, int(now)) def update_whitelist(self) -> Deferred[None]: from twisted.web.client import Agent, readBody @@ -580,7 +613,7 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O ) def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamServerEndpoint: - """ Start to listen to new connection according to the description. + """ Start to listen for new connection according to the description. If `ssl` is True, then the connection will be wraped by a TLS. @@ -607,6 +640,17 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamSer self.log.info('listen on', endpoint=description) endpoint.listen(factory) + + # XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases + # that we have will have a _port attribute + port = getattr(endpoint, '_port', None) + assert self.manager is not None + if self.manager.hostname and port is not None: + proto, _, _ = description.partition(':') + address = '{}://{}:{}'.format(proto, self.manager.hostname, port) + assert self.manager.my_peer is not None + self.manager.my_peer.entrypoints.append(address) + return endpoint def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: