diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index 94bd07ff2..7aefaee3f 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -157,6 +157,9 @@ def __init__(self, # List of known peers. self.peer_storage = PeerStorage() # dict[string (peer.id), PeerId] + # Maximum unseen time before removing a peer (seconds). + self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes + # A timer to try to reconnect to the disconnect known peers. self.lc_reconnect = LoopingCall(self.reconnect_to_all) self.lc_reconnect.clock = self.reactor @@ -394,11 +397,15 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: protocol.enable_sync() # Notify other peers about this new peer connection. + self.relay_peer_to_ready_connections(protocol.peer) + + def relay_peer_to_ready_connections(self, peer: PeerId) -> None: + """Relay peer to all ready connections.""" for conn in self.iter_ready_connections(): - if conn != protocol: - assert conn.state is not None - assert isinstance(conn.state, ReadyState) - conn.state.send_peers([protocol]) + if conn.peer == peer: + continue + assert isinstance(conn.state, ReadyState) + conn.state.send_peers([peer]) def on_peer_disconnect(self, protocol: HathorProtocol) -> None: """Called when a peer disconnect.""" @@ -459,12 +466,28 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) -> peer = self.received_peer_storage.add_or_merge(peer) self.connect_to_if_not_connected(peer, int(self.reactor.seconds())) + def peers_cleanup(self) -> None: + """Clean up aged peers.""" + now = self.reactor.seconds() + to_be_removed: list[PeerId] = [] + for peer in self.peer_storage.values(): + assert peer.id is not None + if self.is_peer_connected(peer.id): + continue + dt = now - peer.last_seen + if dt > self.max_peer_unseen_dt: + to_be_removed.append(peer) + + for peer in to_be_removed: + self.peer_storage.remove(peer) + def reconnect_to_all(self) -> None: """ It is called by the `lc_reconnect` timer and tries to connect to all known peers. TODO(epnichols): Should we always connect to *all*? Should there be a max #? """ + self.peers_cleanup() # when we have no connected peers left, run the discovery process again assert self.manager is not None now = self.reactor.seconds() diff --git a/hathor/p2p/peer_id.py b/hathor/p2p/peer_id.py index 612459e7a..532502ab8 100644 --- a/hathor/p2p/peer_id.py +++ b/hathor/p2p/peer_id.py @@ -15,6 +15,7 @@ import base64 import hashlib from enum import Enum +from math import inf from typing import TYPE_CHECKING, Any, Generator, Optional, cast from cryptography import x509 @@ -61,6 +62,7 @@ class PeerId: retry_timestamp: int # should only try connecting to this peer after this timestamp retry_interval: int # how long to wait for next connection retry. It will double for each failure retry_attempts: int # how many retries were made + last_seen: float # last time this peer was seen flags: set[str] def __init__(self, auto_generate_keys: bool = True) -> None: @@ -73,6 +75,7 @@ def __init__(self, auto_generate_keys: bool = True) -> None: self.retry_timestamp = 0 self.retry_interval = 5 self.retry_attempts = 0 + self.last_seen = inf self.flags = set() self._certificate_options: Optional[CertificateOptions] = None diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index 822973bed..3df296466 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -293,7 +293,10 @@ def recv_message(self, cmd: ProtocolMessages, payload: str) -> Optional[Deferred """ assert self.state is not None - self.last_message = self.reactor.seconds() + now = self.reactor.seconds() + self.last_message = now + if self.peer is not None: + self.peer.last_seen = now self.reset_idle_timeout() if not self.ratelimit.add_hit(self.RateLimitKeys.GLOBAL): diff --git a/hathor/p2p/resources/status.py b/hathor/p2p/resources/status.py index 35a722e69..09be830ca 100644 --- a/hathor/p2p/resources/status.py +++ b/hathor/p2p/resources/status.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time - import hathor from hathor.api_util import Resource, set_cors from hathor.cli.openapi_files.register import register_resource @@ -34,11 +32,14 @@ class StatusResource(Resource): def __init__(self, manager): self._settings = get_settings() self.manager = manager + self.reactor = manager.reactor def render_GET(self, request): request.setHeader(b'content-type', b'application/json; charset=utf-8') set_cors(request, 'GET') + now = self.reactor.seconds() + connecting_peers = [] # TODO: refactor as not to use a private item for endpoint, deferred in self.manager.connections.connecting_peers.items(): @@ -53,7 +54,7 @@ def render_GET(self, request): handshaking_peers.append({ 'address': '{}:{}'.format(remote.host, remote.port), 'state': conn.state.state_name, - 'uptime': time.time() - conn.connection_time, + 'uptime': now - conn.connection_time, 'app_version': conn.app_version, }) @@ -65,12 +66,13 @@ def render_GET(self, request): connected_peers.append({ 'id': conn.peer.id, 'app_version': conn.app_version, - 'uptime': time.time() - conn.connection_time, + 'current_time': now, + 'uptime': now - conn.connection_time, 'address': '{}:{}'.format(remote.host, remote.port), 'state': conn.state.state_name, # 'received_bytes': conn.received_bytes, 'rtt': list(conn.state.rtt_window), - 'last_message': time.time() - conn.last_message, + 'last_message': now - conn.last_message, 'plugins': status, 'warning_flags': [flag.value for flag in conn.warning_flags], 'protocol_version': str(conn.sync_version), @@ -82,6 +84,7 @@ def render_GET(self, request): known_peers.append({ 'id': peer.id, 'entrypoints': peer.entrypoints, + 'last_seen': now - peer.last_seen, 'flags': [flag.value for flag in peer.flags], }) @@ -103,7 +106,7 @@ def render_GET(self, request): 'app_version': app, 'state': self.manager.state.value, 'network': self.manager.network, - 'uptime': time.time() - self.manager.start_time, + 'uptime': now - self.manager.start_time, 'entrypoints': self.manager.connections.my_peer.entrypoints, }, 'peers_whitelist': self.manager.peers_whitelist, diff --git a/hathor/p2p/states/ready.py b/hathor/p2p/states/ready.py index a93727719..3f945ff02 100644 --- a/hathor/p2p/states/ready.py +++ b/hathor/p2p/states/ready.py @@ -47,6 +47,11 @@ def __init__(self, protocol: 'HathorProtocol') -> None: self.lc_ping = LoopingCall(self.send_ping_if_necessary) self.lc_ping.clock = self.reactor + # LC to send GET_PEERS every once in a while. + self.lc_get_peers = LoopingCall(self.send_get_peers) + self.lc_get_peers.clock = self.reactor + self.get_peers_interval: int = 5 * 60 # Once every 5 minutes. + # Minimum interval between PING messages (in seconds). self.ping_interval: int = 3 @@ -110,6 +115,8 @@ def on_enter(self) -> None: self.protocol.on_peer_ready() self.lc_ping.start(1, now=False) + + self.lc_get_peers.start(self.get_peers_interval, now=False) self.send_get_peers() if self.lc_get_best_blockchain is not None: @@ -121,6 +128,9 @@ def on_exit(self) -> None: if self.lc_ping.running: self.lc_ping.stop() + if self.lc_get_peers.running: + self.lc_get_peers.stop() + if self.lc_get_best_blockchain is not None and self.lc_get_best_blockchain.running: self.lc_get_best_blockchain.stop() @@ -146,22 +156,21 @@ def handle_get_peers(self, payload: str) -> None: """ Executed when a GET-PEERS command is received. It just responds with a list of all known peers. """ - if self.protocol.connections: - self.send_peers(self.protocol.connections.iter_ready_connections()) + for peer in self.protocol.connections.peer_storage.values(): + self.send_peers([peer]) - def send_peers(self, connections: Iterable['HathorProtocol']) -> None: - """ Send a PEERS command with a list of all connected peers. + def send_peers(self, peer_list: Iterable['PeerId']) -> None: + """ Send a PEERS command with a list of peers. """ - peers = [] - for conn in connections: - assert conn.peer is not None - peers.append({ - 'id': conn.peer.id, - 'entrypoints': conn.peer.entrypoints, - 'last_message': conn.last_message, - }) - self.send_message(ProtocolMessages.PEERS, json_dumps(peers)) - self.log.debug('send peers', peers=peers) + data = [] + for peer in peer_list: + if peer.entrypoints: + data.append({ + 'id': peer.id, + 'entrypoints': peer.entrypoints, + }) + self.send_message(ProtocolMessages.PEERS, json_dumps(data)) + self.log.debug('send peers', peers=data) def handle_peers(self, payload: str) -> None: """ Executed when a PEERS command is received. It updates the list diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index ce3aea322..8a53fd962 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -622,6 +622,14 @@ def handle_get_tips(self, payload: str) -> None: def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None: """Try to send a TIPS message. If rate limit has been reached, it schedules to send it later.""" + + # Filter for active delayed calls once one is executing + self._send_tips_call_later = [ + call_later + for call_later in self._send_tips_call_later + if call_later.active() + ] + if not self.global_rate_limiter.add_hit(self.GlobalRateLimiter.SEND_TIPS): self.log.debug('send_tips throttled') if len(self._send_tips_call_later) >= self._settings.MAX_GET_TIPS_DELAYED_CALLS: @@ -635,18 +643,12 @@ def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fals ) ) return + self._send_tips(timestamp, include_hashes, offset) def _send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None: """ Send a TIPS message. """ - # Filter for active delayed calls once one is executing - self._send_tips_call_later = [ - call_later - for call_later in self._send_tips_call_later - if call_later.active() - ] - if timestamp is None: timestamp = self.manager.tx_storage.latest_timestamp diff --git a/tests/p2p/test_protocol.py b/tests/p2p/test_protocol.py index 8ce4f85b0..ae2a10a75 100644 --- a/tests/p2p/test_protocol.py +++ b/tests/p2p/test_protocol.py @@ -411,16 +411,18 @@ def test_two_connections(self): self.assertAndStepConn(self.conn, b'^GET-TIPS') self.assertAndStepConn(self.conn, b'^PING') - for _ in range(19): + for _ in range(20): self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN') - # peer1 should now send a PEERS with the new peer that just connected - self.assertAndStepConn(self.conn, b'^PEERS', b'^GET-BEST-BLOCKCHAIN') - self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^TIPS') - self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS') - self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS-END') - self.assertAndStepConn(self.conn, b'^TIPS-END', b'^PONG') - self.assertAndStepConn(self.conn, b'^PONG', b'^BEST-BLOCKCHAIN') + self.assertAndStepConn(self.conn, b'^GET-PEERS', b'^GET-PEERS') + self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^GET-BEST-BLOCKCHAIN') + self.assertAndStepConn(self.conn, b'^GET-PEERS', b'^GET-PEERS') + self.assertAndStepConn(self.conn, b'^PEERS', b'^GET-BEST-BLOCKCHAIN') + self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^TIPS') + self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS') + self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS-END') + self.assertAndStepConn(self.conn, b'^TIPS-END', b'^PONG') + self.assertAndStepConn(self.conn, b'^PONG', b'^BEST-BLOCKCHAIN') self.assertIsConnected() @inlineCallbacks