Skip to content

feat(p2p): run periodic bootstrap discovery #783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
dns_hosts.extend(self._args.dns)

if dns_hosts:
self.manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))
p2p_manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))

if self._args.bootstrap:
self.manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap))
p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(self._args.bootstrap))

if self._args.test_mode_tx_weight:
_set_test_mode(TestMode.TEST_TX_WEIGHT)
Expand All @@ -281,7 +281,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
self.log.warn('--memory-indexes is implied for memory storage or JSON storage')

for description in self._args.listen:
self.manager.add_listen_address(description)
p2p_manager.add_listen_address(description)

if self._args.peer_id_blacklist:
self.log.info('with peer id blacklist', blacklist=self._args.peer_id_blacklist)
Expand Down
3 changes: 3 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ def MAXIMUM_NUMBER_OF_HALVINGS(self) -> int:
# Time to update the peers that are running sync.
SYNC_UPDATE_INTERVAL: int = 10 * 60 # seconds

# Interval to re-run peer discovery.
PEER_DISCOVERY_INTERVAL: int = 5 * 60 # seconds

# All settings related to Feature Activation
FEATURE_ACTIVATION: FeatureActivationSettings = FeatureActivationSettings()

Expand Down
36 changes: 1 addition & 35 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from hathor.feature_activation.feature_service import FeatureService
from hathor.mining import BlockTemplate, BlockTemplates
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
from hathor.profiler import get_cpu_profiler
Expand Down Expand Up @@ -183,8 +182,6 @@ def __init__(self,

self.consensus_algorithm = consensus_algorithm

self.peer_discoveries: list[PeerDiscovery] = []

self.connections = p2p_manager

self.metrics = Metrics(
Expand All @@ -209,9 +206,6 @@ def __init__(self,
# Thread pool used to resolve pow when sending tokens
self.pow_thread_pool = ThreadPool(minthreads=0, maxthreads=settings.MAX_POW_THREADS, name='Pow thread pool')

# List of addresses to listen for new connections (eg: [tcp:8000])
self.listen_addresses: list[str] = []

# Full verification execute all validations for transactions and blocks when initializing the node
# Can be activated on the command line with --full-verification
self._full_verification = full_verification
Expand Down Expand Up @@ -277,7 +271,6 @@ def start(self) -> None:
self.state = self.NodeState.INITIALIZING
self.pubsub.publish(HathorEvents.MANAGER_ON_START)
self._event_manager.load_started()
self.connections.start()
self.pow_thread_pool.start()

# Disable get transaction lock when initializing components
Expand All @@ -300,10 +293,7 @@ def start(self) -> None:
# Metric starts to capture data
self.metrics.start()

for description in self.listen_addresses:
self.listen(description)

self.do_discovery()
self.connections.start()

self.start_time = time.time()

Expand Down Expand Up @@ -353,13 +343,6 @@ def stop(self) -> Deferred:

return defer.DeferredList(waits)

def do_discovery(self) -> None:
"""
Do a discovery and connect on all discovery strategies.
"""
for peer_discovery in self.peer_discoveries:
peer_discovery.discover_and_connect(self.connections.connect_to)

def start_profiler(self, *, reset: bool = False) -> None:
"""
Start profiler. It can be activated from a web resource, as well.
Expand Down Expand Up @@ -695,12 +678,6 @@ def _sync_v2_resume_validations(self) -> None:
self.sync_v2_step_validations(depended_final_txs, quiet=True)
self.log.debug('pending validations finished')

def add_listen_address(self, addr: str) -> None:
self.listen_addresses.append(addr)

def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None:
self.peer_discoveries.append(peer_discovery)

def get_new_tx_parents(self, timestamp: Optional[float] = None) -> list[VertexId]:
"""Select which transactions will be confirmed by a new transaction.

Expand Down Expand Up @@ -1141,17 +1118,6 @@ def _log_if_feature_is_active(self, block: Block, feature: Feature) -> None:
if self._feature_service.is_feature_active(block=block, feature=feature):
self.log.info('Feature is ACTIVE for block', feature=feature.value, block_height=block.get_height())

def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
endpoint = self.connections.listen(description, use_ssl)
# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases
# that we have will have a _port attribute
port = getattr(endpoint, '_port', None)

if self.hostname:
proto, _, _ = description.partition(':')
address = '{}://{}:{}'.format(proto, self.hostname, port)
self.my_peer.entrypoints.append(address)

def has_sync_version_capability(self) -> bool:
return settings.CAPABILITY_SYNC_VERSION in self.capabilities

Expand Down
56 changes: 50 additions & 6 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from hathor.conf import HathorSettings
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_id import PeerId
from hathor.p2p.peer_storage import PeerStorage
from hathor.p2p.protocol import HathorProtocol
Expand Down Expand Up @@ -73,6 +74,7 @@ class ConnectionsManager:
"""
MAX_ENABLED_SYNC = settings.MAX_ENABLED_SYNC
SYNC_UPDATE_INTERVAL = settings.SYNC_UPDATE_INTERVAL
PEER_DISCOVERY_INTERVAL = settings.PEER_DISCOVERY_INTERVAL

class GlobalRateLimiter:
SEND_TIPS = 'NodeSyncTimestamp.send_tips'
Expand Down Expand Up @@ -114,6 +116,12 @@ def __init__(self,

self.network = network

# List of addresses to listen for new connections (eg: [tcp:8000])
self.listen_addresses: list[str] = []

# List of peer discovery methods.
self.peer_discoveries: list[PeerDiscovery] = []

# Options
self.localhost_only = False

Expand Down Expand Up @@ -179,6 +187,9 @@ def __init__(self,
self.enable_sync_v1_1 = enable_sync_v1_1
self.enable_sync_v2 = enable_sync_v2

# Timestamp when the last discovery ran
self._last_discovery: float = 0.

# sync-manager factories
self._sync_factories = {}
if enable_sync_v1:
Expand All @@ -198,6 +209,21 @@ def set_manager(self, manager: 'HathorManager') -> None:
indexes.enable_deps_index()
indexes.enable_mempool_index()

def add_listen_address(self, addr: str) -> None:
"""Add address to listen for incoming connections."""
self.listen_addresses.append(addr)

def add_peer_discovery(self, peer_discovery: PeerDiscovery) -> None:
"""Add a peer discovery method."""
self.peer_discoveries.append(peer_discovery)

def do_discovery(self) -> None:
"""
Do a discovery and connect on all discovery strategies.
"""
for peer_discovery in self.peer_discoveries:
peer_discovery.discover_and_connect(self.connect_to)

def disable_rate_limiter(self) -> None:
"""Disable global rate limiter."""
self.rate_limiter.unset_limit(self.GlobalRateLimiter.SEND_TIPS)
Expand All @@ -213,9 +239,15 @@ def enable_rate_limiter(self, max_hits: int = 16, window_seconds: float = 1) ->
def start(self) -> None:
self.lc_reconnect.start(5, now=False)
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)

if settings.ENABLE_PEER_WHITELIST:
self._start_whitelist_reconnect()

for description in self.listen_addresses:
self.listen(description)

self.do_discovery()

def _start_whitelist_reconnect(self) -> None:
# The deferred returned by the LoopingCall start method
# executes when the looping call stops running
Expand Down Expand Up @@ -424,7 +456,7 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) ->
if peer.id == self.my_peer.id:
return
peer = self.received_peer_storage.add_or_merge(peer)
self.connect_to_if_not_connected(peer, 0)
self.connect_to_if_not_connected(peer, int(self.reactor.seconds()))

def reconnect_to_all(self) -> None:
""" It is called by the `lc_reconnect` timer and tries to connect to all known
Expand All @@ -434,13 +466,14 @@ def reconnect_to_all(self) -> None:
"""
# when we have no connected peers left, run the discovery process again
assert self.manager is not None
if len(self.connected_peers) < 1:
self.manager.do_discovery()
now = int(self.reactor.seconds())
now = self.reactor.seconds()
if now - self._last_discovery >= self.PEER_DISCOVERY_INTERVAL:
self._last_discovery = now
self.do_discovery()
# We need to use list() here because the dict might change inside connect_to_if_not_connected
# when the peer is disconnected and without entrypoint
for peer in list(self.peer_storage.values()):
self.connect_to_if_not_connected(peer, now)
self.connect_to_if_not_connected(peer, int(now))

def update_whitelist(self) -> Deferred[None]:
from twisted.web.client import Agent, readBody
Expand Down Expand Up @@ -580,7 +613,7 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O
)

def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamServerEndpoint:
""" Start to listen to new connection according to the description.
""" Start to listen for new connection according to the description.

If `ssl` is True, then the connection will be wraped by a TLS.

Expand All @@ -607,6 +640,17 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamSer

self.log.info('listen on', endpoint=description)
endpoint.listen(factory)

# XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases
# that we have will have a _port attribute
port = getattr(endpoint, '_port', None)
assert self.manager is not None
if self.manager.hostname and port is not None:
proto, _, _ = description.partition(':')
address = '{}://{}:{}'.format(proto, self.manager.hostname, port)
assert self.manager.my_peer is not None
self.manager.my_peer.entrypoints.append(address)

return endpoint

def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
Expand Down