Skip to content

Commit e305645

Browse files
msbroglijansegre
authored andcommitted
feat(p2p): relay all recently-seen peers for neighbors.
1 parent 889bf5d commit e305645

File tree

7 files changed

+85
-40
lines changed

7 files changed

+85
-40
lines changed

hathor/p2p/manager.py

+27-4
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ def __init__(self,
157157
# List of known peers.
158158
self.peer_storage = PeerStorage() # dict[string (peer.id), PeerId]
159159

160+
# Maximum unseen time before removing a peer (seconds).
161+
self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes
162+
160163
# A timer to try to reconnect to the disconnect known peers.
161164
self.lc_reconnect = LoopingCall(self.reconnect_to_all)
162165
self.lc_reconnect.clock = self.reactor
@@ -394,11 +397,15 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
394397
protocol.enable_sync()
395398

396399
# Notify other peers about this new peer connection.
400+
self.relay_peer_to_ready_connections(protocol.peer)
401+
402+
def relay_peer_to_ready_connections(self, peer: PeerId) -> None:
403+
"""Relay peer to all ready connections."""
397404
for conn in self.iter_ready_connections():
398-
if conn != protocol:
399-
assert conn.state is not None
400-
assert isinstance(conn.state, ReadyState)
401-
conn.state.send_peers([protocol])
405+
if conn.peer == peer:
406+
continue
407+
assert isinstance(conn.state, ReadyState)
408+
conn.state.send_peers([peer])
402409

403410
def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
404411
"""Called when a peer disconnect."""
@@ -459,12 +466,28 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) ->
459466
peer = self.received_peer_storage.add_or_merge(peer)
460467
self.connect_to_if_not_connected(peer, int(self.reactor.seconds()))
461468

469+
def peers_cleanup(self) -> None:
470+
"""Clean up aged peers."""
471+
now = self.reactor.seconds()
472+
to_be_removed: list[PeerId] = []
473+
for peer in self.peer_storage.values():
474+
assert peer.id is not None
475+
if self.is_peer_connected(peer.id):
476+
continue
477+
dt = now - peer.last_seen
478+
if dt > self.max_peer_unseen_dt:
479+
to_be_removed.append(peer)
480+
481+
for peer in to_be_removed:
482+
self.peer_storage.remove(peer)
483+
462484
def reconnect_to_all(self) -> None:
463485
""" It is called by the `lc_reconnect` timer and tries to connect to all known
464486
peers.
465487
466488
TODO(epnichols): Should we always connect to *all*? Should there be a max #?
467489
"""
490+
self.peers_cleanup()
468491
# when we have no connected peers left, run the discovery process again
469492
assert self.manager is not None
470493
now = self.reactor.seconds()

hathor/p2p/peer_id.py

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import base64
1616
import hashlib
1717
from enum import Enum
18+
from math import inf
1819
from typing import TYPE_CHECKING, Any, Generator, Optional, cast
1920

2021
from cryptography import x509
@@ -61,6 +62,7 @@ class PeerId:
6162
retry_timestamp: int # should only try connecting to this peer after this timestamp
6263
retry_interval: int # how long to wait for next connection retry. It will double for each failure
6364
retry_attempts: int # how many retries were made
65+
last_seen: float # last time this peer was seen
6466
flags: set[str]
6567

6668
def __init__(self, auto_generate_keys: bool = True) -> None:
@@ -73,6 +75,7 @@ def __init__(self, auto_generate_keys: bool = True) -> None:
7375
self.retry_timestamp = 0
7476
self.retry_interval = 5
7577
self.retry_attempts = 0
78+
self.last_seen = inf
7679
self.flags = set()
7780
self._certificate_options: Optional[CertificateOptions] = None
7881

hathor/p2p/protocol.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,10 @@ def recv_message(self, cmd: ProtocolMessages, payload: str) -> Optional[Deferred
293293
"""
294294
assert self.state is not None
295295

296-
self.last_message = self.reactor.seconds()
296+
now = self.reactor.seconds()
297+
self.last_message = now
298+
if self.peer is not None:
299+
self.peer.last_seen = now
297300
self.reset_idle_timeout()
298301

299302
if not self.ratelimit.add_hit(self.RateLimitKeys.GLOBAL):

hathor/p2p/resources/status.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import time
16-
1715
import hathor
1816
from hathor.api_util import Resource, set_cors
1917
from hathor.cli.openapi_files.register import register_resource
@@ -34,11 +32,14 @@ class StatusResource(Resource):
3432
def __init__(self, manager):
3533
self._settings = get_settings()
3634
self.manager = manager
35+
self.reactor = manager.reactor
3736

3837
def render_GET(self, request):
3938
request.setHeader(b'content-type', b'application/json; charset=utf-8')
4039
set_cors(request, 'GET')
4140

41+
now = self.reactor.seconds()
42+
4243
connecting_peers = []
4344
# TODO: refactor as not to use a private item
4445
for endpoint, deferred in self.manager.connections.connecting_peers.items():
@@ -53,7 +54,7 @@ def render_GET(self, request):
5354
handshaking_peers.append({
5455
'address': '{}:{}'.format(remote.host, remote.port),
5556
'state': conn.state.state_name,
56-
'uptime': time.time() - conn.connection_time,
57+
'uptime': now - conn.connection_time,
5758
'app_version': conn.app_version,
5859
})
5960

@@ -65,12 +66,13 @@ def render_GET(self, request):
6566
connected_peers.append({
6667
'id': conn.peer.id,
6768
'app_version': conn.app_version,
68-
'uptime': time.time() - conn.connection_time,
69+
'current_time': now,
70+
'uptime': now - conn.connection_time,
6971
'address': '{}:{}'.format(remote.host, remote.port),
7072
'state': conn.state.state_name,
7173
# 'received_bytes': conn.received_bytes,
7274
'rtt': list(conn.state.rtt_window),
73-
'last_message': time.time() - conn.last_message,
75+
'last_message': now - conn.last_message,
7476
'plugins': status,
7577
'warning_flags': [flag.value for flag in conn.warning_flags],
7678
'protocol_version': str(conn.sync_version),
@@ -82,6 +84,7 @@ def render_GET(self, request):
8284
known_peers.append({
8385
'id': peer.id,
8486
'entrypoints': peer.entrypoints,
87+
'last_seen': now - peer.last_seen,
8588
'flags': [flag.value for flag in peer.flags],
8689
})
8790

@@ -103,7 +106,7 @@ def render_GET(self, request):
103106
'app_version': app,
104107
'state': self.manager.state.value,
105108
'network': self.manager.network,
106-
'uptime': time.time() - self.manager.start_time,
109+
'uptime': now - self.manager.start_time,
107110
'entrypoints': self.manager.connections.my_peer.entrypoints,
108111
},
109112
'peers_whitelist': self.manager.peers_whitelist,

hathor/p2p/states/ready.py

+23-14
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ def __init__(self, protocol: 'HathorProtocol') -> None:
4747
self.lc_ping = LoopingCall(self.send_ping_if_necessary)
4848
self.lc_ping.clock = self.reactor
4949

50+
# LC to send GET_PEERS every once in a while.
51+
self.lc_get_peers = LoopingCall(self.send_get_peers)
52+
self.lc_get_peers.clock = self.reactor
53+
self.get_peers_interval: int = 5 * 60 # Once every 5 minutes.
54+
5055
# Minimum interval between PING messages (in seconds).
5156
self.ping_interval: int = 3
5257

@@ -110,6 +115,8 @@ def on_enter(self) -> None:
110115
self.protocol.on_peer_ready()
111116

112117
self.lc_ping.start(1, now=False)
118+
119+
self.lc_get_peers.start(self.get_peers_interval, now=False)
113120
self.send_get_peers()
114121

115122
if self.lc_get_best_blockchain is not None:
@@ -121,6 +128,9 @@ def on_exit(self) -> None:
121128
if self.lc_ping.running:
122129
self.lc_ping.stop()
123130

131+
if self.lc_get_peers.running:
132+
self.lc_get_peers.stop()
133+
124134
if self.lc_get_best_blockchain is not None and self.lc_get_best_blockchain.running:
125135
self.lc_get_best_blockchain.stop()
126136

@@ -146,22 +156,21 @@ def handle_get_peers(self, payload: str) -> None:
146156
""" Executed when a GET-PEERS command is received. It just responds with
147157
a list of all known peers.
148158
"""
149-
if self.protocol.connections:
150-
self.send_peers(self.protocol.connections.iter_ready_connections())
159+
for peer in self.protocol.connections.peer_storage.values():
160+
self.send_peers([peer])
151161

152-
def send_peers(self, connections: Iterable['HathorProtocol']) -> None:
153-
""" Send a PEERS command with a list of all connected peers.
162+
def send_peers(self, peer_list: Iterable['PeerId']) -> None:
163+
""" Send a PEERS command with a list of peers.
154164
"""
155-
peers = []
156-
for conn in connections:
157-
assert conn.peer is not None
158-
peers.append({
159-
'id': conn.peer.id,
160-
'entrypoints': conn.peer.entrypoints,
161-
'last_message': conn.last_message,
162-
})
163-
self.send_message(ProtocolMessages.PEERS, json_dumps(peers))
164-
self.log.debug('send peers', peers=peers)
165+
data = []
166+
for peer in peer_list:
167+
if peer.entrypoints:
168+
data.append({
169+
'id': peer.id,
170+
'entrypoints': peer.entrypoints,
171+
})
172+
self.send_message(ProtocolMessages.PEERS, json_dumps(data))
173+
self.log.debug('send peers', peers=data)
165174

166175
def handle_peers(self, payload: str) -> None:
167176
""" Executed when a PEERS command is received. It updates the list

hathor/p2p/sync_v1/agent.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,14 @@ def handle_get_tips(self, payload: str) -> None:
622622

623623
def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None:
624624
"""Try to send a TIPS message. If rate limit has been reached, it schedules to send it later."""
625+
626+
# Filter for active delayed calls once one is executing
627+
self._send_tips_call_later = [
628+
call_later
629+
for call_later in self._send_tips_call_later
630+
if call_later.active()
631+
]
632+
625633
if not self.global_rate_limiter.add_hit(self.GlobalRateLimiter.SEND_TIPS):
626634
self.log.debug('send_tips throttled')
627635
if len(self._send_tips_call_later) >= self._settings.MAX_GET_TIPS_DELAYED_CALLS:
@@ -635,18 +643,12 @@ def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fals
635643
)
636644
)
637645
return
646+
638647
self._send_tips(timestamp, include_hashes, offset)
639648

640649
def _send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None:
641650
""" Send a TIPS message.
642651
"""
643-
# Filter for active delayed calls once one is executing
644-
self._send_tips_call_later = [
645-
call_later
646-
for call_later in self._send_tips_call_later
647-
if call_later.active()
648-
]
649-
650652
if timestamp is None:
651653
timestamp = self.manager.tx_storage.latest_timestamp
652654

tests/p2p/test_protocol.py

+10-8
Original file line numberDiff line numberDiff line change
@@ -411,16 +411,18 @@ def test_two_connections(self):
411411
self.assertAndStepConn(self.conn, b'^GET-TIPS')
412412
self.assertAndStepConn(self.conn, b'^PING')
413413

414-
for _ in range(19):
414+
for _ in range(20):
415415
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN')
416416

417-
# peer1 should now send a PEERS with the new peer that just connected
418-
self.assertAndStepConn(self.conn, b'^PEERS', b'^GET-BEST-BLOCKCHAIN')
419-
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^TIPS')
420-
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS')
421-
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS-END')
422-
self.assertAndStepConn(self.conn, b'^TIPS-END', b'^PONG')
423-
self.assertAndStepConn(self.conn, b'^PONG', b'^BEST-BLOCKCHAIN')
417+
self.assertAndStepConn(self.conn, b'^GET-PEERS', b'^GET-PEERS')
418+
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^GET-BEST-BLOCKCHAIN')
419+
self.assertAndStepConn(self.conn, b'^GET-PEERS', b'^GET-PEERS')
420+
self.assertAndStepConn(self.conn, b'^PEERS', b'^GET-BEST-BLOCKCHAIN')
421+
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^TIPS')
422+
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS')
423+
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS-END')
424+
self.assertAndStepConn(self.conn, b'^TIPS-END', b'^PONG')
425+
self.assertAndStepConn(self.conn, b'^PONG', b'^BEST-BLOCKCHAIN')
424426
self.assertIsConnected()
425427

426428
@inlineCallbacks

0 commit comments

Comments
 (0)