Skip to content

Commit a6ceae7

Browse files
committed
feat(p2p): Relay all recently-seen peers for neighbors.
1 parent 2542e8a commit a6ceae7

File tree

5 files changed

+66
-24
lines changed

5 files changed

+66
-24
lines changed

hathor/p2p/manager.py

+25-3
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ def __init__(self,
150150
# List of known peers.
151151
self.peer_storage = PeerStorage() # dict[string (peer.id), PeerId]
152152

153+
# Maximum unseen time before removing a peer (seconds).
154+
self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes
155+
153156
# A timer to try to reconnect to the disconnect known peers.
154157
self.lc_reconnect = LoopingCall(self.reconnect_to_all)
155158
self.lc_reconnect.clock = self.reactor
@@ -375,11 +378,14 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
375378
protocol.enable_sync()
376379

377380
# Notify other peers about this new peer connection.
381+
self.relay_peer_to_ready_connections(protocol.my_peer)
382+
383+
def relay_peer_to_ready_connections(self, peer: PeerId) -> None:
384+
"""Relay peer to all ready connections."""
378385
for conn in self.iter_ready_connections():
379-
if conn != protocol:
380-
assert conn.state is not None
386+
if conn.peer != peer:
381387
assert isinstance(conn.state, ReadyState)
382-
conn.state.send_peers([protocol])
388+
conn.state.send_peers([peer])
383389

384390
def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
385391
"""Called when a peer disconnect."""
@@ -440,12 +446,28 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) ->
440446
peer = self.received_peer_storage.add_or_merge(peer)
441447
self.connect_to_if_not_connected(peer, 0)
442448

449+
def peers_cleanup(self) -> None:
450+
"""Clean up aged peers."""
451+
now = self.reactor.seconds()
452+
to_be_removed: list[PeerId] = []
453+
for peer in self.peer_storage.values():
454+
assert peer.id is not None
455+
if self.is_peer_connected(peer.id):
456+
continue
457+
dt = now - peer.last_seen
458+
if dt > self.max_peer_unseen_dt:
459+
to_be_removed.append(peer)
460+
461+
for peer in to_be_removed:
462+
self.peer_storage.remove(peer)
463+
443464
def reconnect_to_all(self) -> None:
444465
""" It is called by the `lc_reconnect` timer and tries to connect to all known
445466
peers.
446467
447468
TODO(epnichols): Should we always connect to *all*? Should there be a max #?
448469
"""
470+
self.peers_cleanup()
449471
# when we have no connected peers left, run the discovery process again
450472
assert self.manager is not None
451473
if len(self.connected_peers) < 1:

hathor/p2p/peer_id.py

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import base64
1616
import hashlib
1717
from enum import Enum
18+
from math import inf
1819
from typing import TYPE_CHECKING, Any, Generator, Optional, cast
1920

2021
from cryptography import x509
@@ -61,6 +62,7 @@ class PeerId:
6162
retry_timestamp: int # should only try connecting to this peer after this timestamp
6263
retry_interval: int # how long to wait for next connection retry. It will double for each failure
6364
retry_attempts: int # how many retries were made
65+
last_seen: float # last time this peer was seen
6466
flags: set[str]
6567

6668
def __init__(self, auto_generate_keys: bool = True) -> None:
@@ -73,6 +75,7 @@ def __init__(self, auto_generate_keys: bool = True) -> None:
7375
self.retry_timestamp = 0
7476
self.retry_interval = 5
7577
self.retry_attempts = 0
78+
self.last_seen = inf
7679
self.flags = set()
7780
self._certificate_options: Optional[CertificateOptions] = None
7881

hathor/p2p/protocol.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,10 @@ def recv_message(self, cmd: ProtocolMessages, payload: str) -> Optional[Deferred
299299
"""
300300
assert self.state is not None
301301

302-
self.last_message = self.reactor.seconds()
302+
now = self.reactor.seconds()
303+
self.last_message = now
304+
if self.peer is not None:
305+
self.peer.last_seen = now
303306
self.reset_idle_timeout()
304307

305308
if not self.ratelimit.add_hit(self.RateLimitKeys.GLOBAL):

hathor/p2p/resources/status.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import time
16-
1715
import hathor
1816
from hathor.api_util import Resource, set_cors
1917
from hathor.cli.openapi_files.register import register_resource
@@ -34,11 +32,14 @@ class StatusResource(Resource):
3432
def __init__(self, manager):
3533
self._settings = get_settings()
3634
self.manager = manager
35+
self.reactor = manager.reactor
3736

3837
def render_GET(self, request):
3938
request.setHeader(b'content-type', b'application/json; charset=utf-8')
4039
set_cors(request, 'GET')
4140

41+
now = self.reactor.seconds()
42+
4243
connecting_peers = []
4344
# TODO: refactor as not to use a private item
4445
for endpoint, deferred in self.manager.connections.connecting_peers.items():
@@ -53,7 +54,7 @@ def render_GET(self, request):
5354
handshaking_peers.append({
5455
'address': '{}:{}'.format(remote.host, remote.port),
5556
'state': conn.state.state_name,
56-
'uptime': time.time() - conn.connection_time,
57+
'uptime': now - conn.connection_time,
5758
'app_version': conn.app_version,
5859
})
5960

@@ -65,12 +66,13 @@ def render_GET(self, request):
6566
connected_peers.append({
6667
'id': conn.peer.id,
6768
'app_version': conn.app_version,
68-
'uptime': time.time() - conn.connection_time,
69+
'current_time': now,
70+
'uptime': now - conn.connection_time,
6971
'address': '{}:{}'.format(remote.host, remote.port),
7072
'state': conn.state.state_name,
7173
# 'received_bytes': conn.received_bytes,
7274
'rtt': list(conn.state.rtt_window),
73-
'last_message': time.time() - conn.last_message,
75+
'last_message': now - conn.last_message,
7476
'plugins': status,
7577
'warning_flags': [flag.value for flag in conn.warning_flags],
7678
'protocol_version': str(conn.sync_version),
@@ -82,6 +84,7 @@ def render_GET(self, request):
8284
known_peers.append({
8385
'id': peer.id,
8486
'entrypoints': peer.entrypoints,
87+
'last_seen': now - peer.last_seen,
8588
'flags': [flag.value for flag in peer.flags],
8689
})
8790

@@ -103,7 +106,7 @@ def render_GET(self, request):
103106
'app_version': app,
104107
'state': self.manager.state.value,
105108
'network': self.manager.network,
106-
'uptime': time.time() - self.manager.start_time,
109+
'uptime': now - self.manager.start_time,
107110
'entrypoints': self.manager.connections.my_peer.entrypoints,
108111
},
109112
'peers_whitelist': self.manager.peers_whitelist,

hathor/p2p/states/ready.py

+25-14
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ def __init__(self, protocol: 'HathorProtocol') -> None:
4747
self.lc_ping = LoopingCall(self.send_ping_if_necessary)
4848
self.lc_ping.clock = self.reactor
4949

50+
# LC to send GET_PEERS every once in a while.
51+
self.lc_get_peers = LoopingCall(self.send_get_peers)
52+
self.lc_get_peers.clock = self.reactor
53+
self.get_peers_interval: int = 5 * 60 # Once every 5 minutes.
54+
5055
# Minimum interval between PING messages (in seconds).
5156
self.ping_interval: int = 3
5257

@@ -110,6 +115,8 @@ def on_enter(self) -> None:
110115
self.protocol.on_peer_ready()
111116

112117
self.lc_ping.start(1, now=False)
118+
119+
self.lc_get_peers.start(self.get_peers_interval, now=False)
113120
self.send_get_peers()
114121

115122
if self.lc_get_best_blockchain is not None:
@@ -121,6 +128,9 @@ def on_exit(self) -> None:
121128
if self.lc_ping.running:
122129
self.lc_ping.stop()
123130

131+
if self.lc_get_peers.running:
132+
self.lc_get_peers.stop()
133+
124134
if self.lc_get_best_blockchain is not None and self.lc_get_best_blockchain.running:
125135
self.lc_get_best_blockchain.stop()
126136

@@ -146,22 +156,23 @@ def handle_get_peers(self, payload: str) -> None:
146156
""" Executed when a GET-PEERS command is received. It just responds with
147157
a list of all known peers.
148158
"""
149-
if self.protocol.connections:
150-
self.send_peers(self.protocol.connections.iter_ready_connections())
159+
for peer in self.protocol.connections.peer_storage.values():
160+
self.send_peers([peer])
151161

152-
def send_peers(self, connections: Iterable['HathorProtocol']) -> None:
153-
""" Send a PEERS command with a list of all connected peers.
162+
def send_peers(self, peer_list: Iterable['PeerId']) -> None:
163+
""" Send a PEERS command with a list of peers.
154164
"""
155-
peers = []
156-
for conn in connections:
157-
assert conn.peer is not None
158-
peers.append({
159-
'id': conn.peer.id,
160-
'entrypoints': conn.peer.entrypoints,
161-
'last_message': conn.last_message,
162-
})
163-
self.send_message(ProtocolMessages.PEERS, json_dumps(peers))
164-
self.log.debug('send peers', peers=peers)
165+
data = []
166+
for peer in peer_list:
167+
if peer.entrypoints:
168+
data.append({
169+
'id': peer.id,
170+
'entrypoints': peer.entrypoints,
171+
})
172+
if not data:
173+
return
174+
self.send_message(ProtocolMessages.PEERS, json_dumps(data))
175+
self.log.debug('send peers', peers=data)
165176

166177
def handle_peers(self, payload: str) -> None:
167178
""" Executed when a PEERS command is received. It updates the list

0 commit comments

Comments
 (0)