Skip to content

feat(p2p): relay all recently-seen peers for neighbors #782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ def __init__(self,
# List of known peers.
self.peer_storage = PeerStorage() # dict[string (peer.id), PeerId]

# Maximum unseen time before removing a peer (seconds).
self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes

# A timer to try to reconnect to the disconnect known peers.
self.lc_reconnect = LoopingCall(self.reconnect_to_all)
self.lc_reconnect.clock = self.reactor
Expand Down Expand Up @@ -394,11 +397,15 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
protocol.enable_sync()

# Notify other peers about this new peer connection.
self.relay_peer_to_ready_connections(protocol.peer)

def relay_peer_to_ready_connections(self, peer: PeerId) -> None:
"""Relay peer to all ready connections."""
for conn in self.iter_ready_connections():
if conn != protocol:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
conn.state.send_peers([protocol])
if conn.peer == peer:
continue
assert isinstance(conn.state, ReadyState)
conn.state.send_peers([peer])

def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
"""Called when a peer disconnect."""
Expand Down Expand Up @@ -459,12 +466,28 @@ def on_receive_peer(self, peer: PeerId, origin: Optional[ReadyState] = None) ->
peer = self.received_peer_storage.add_or_merge(peer)
self.connect_to_if_not_connected(peer, int(self.reactor.seconds()))

def peers_cleanup(self) -> None:
"""Clean up aged peers."""
now = self.reactor.seconds()
to_be_removed: list[PeerId] = []
for peer in self.peer_storage.values():
assert peer.id is not None
if self.is_peer_connected(peer.id):
continue
dt = now - peer.last_seen
if dt > self.max_peer_unseen_dt:
to_be_removed.append(peer)

for peer in to_be_removed:
self.peer_storage.remove(peer)

def reconnect_to_all(self) -> None:
""" It is called by the `lc_reconnect` timer and tries to connect to all known
peers.

TODO(epnichols): Should we always connect to *all*? Should there be a max #?
"""
self.peers_cleanup()
# when we have no connected peers left, run the discovery process again
assert self.manager is not None
now = self.reactor.seconds()
Expand Down
3 changes: 3 additions & 0 deletions hathor/p2p/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import base64
import hashlib
from enum import Enum
from math import inf
from typing import TYPE_CHECKING, Any, Generator, Optional, cast

from cryptography import x509
Expand Down Expand Up @@ -61,6 +62,7 @@ class PeerId:
retry_timestamp: int # should only try connecting to this peer after this timestamp
retry_interval: int # how long to wait for next connection retry. It will double for each failure
retry_attempts: int # how many retries were made
last_seen: float # last time this peer was seen
flags: set[str]

def __init__(self, auto_generate_keys: bool = True) -> None:
Expand All @@ -73,6 +75,7 @@ def __init__(self, auto_generate_keys: bool = True) -> None:
self.retry_timestamp = 0
self.retry_interval = 5
self.retry_attempts = 0
self.last_seen = inf
self.flags = set()
self._certificate_options: Optional[CertificateOptions] = None

Expand Down
5 changes: 4 additions & 1 deletion hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ def recv_message(self, cmd: ProtocolMessages, payload: str) -> Optional[Deferred
"""
assert self.state is not None

self.last_message = self.reactor.seconds()
now = self.reactor.seconds()
self.last_message = now
if self.peer is not None:
self.peer.last_seen = now
self.reset_idle_timeout()

if not self.ratelimit.add_hit(self.RateLimitKeys.GLOBAL):
Expand Down
15 changes: 9 additions & 6 deletions hathor/p2p/resources/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time

import hathor
from hathor.api_util import Resource, set_cors
from hathor.cli.openapi_files.register import register_resource
Expand All @@ -34,11 +32,14 @@ class StatusResource(Resource):
def __init__(self, manager):
self._settings = get_settings()
self.manager = manager
self.reactor = manager.reactor

def render_GET(self, request):
request.setHeader(b'content-type', b'application/json; charset=utf-8')
set_cors(request, 'GET')

now = self.reactor.seconds()

connecting_peers = []
# TODO: refactor as not to use a private item
for endpoint, deferred in self.manager.connections.connecting_peers.items():
Expand All @@ -53,7 +54,7 @@ def render_GET(self, request):
handshaking_peers.append({
'address': '{}:{}'.format(remote.host, remote.port),
'state': conn.state.state_name,
'uptime': time.time() - conn.connection_time,
'uptime': now - conn.connection_time,
'app_version': conn.app_version,
})

Expand All @@ -65,12 +66,13 @@ def render_GET(self, request):
connected_peers.append({
'id': conn.peer.id,
'app_version': conn.app_version,
'uptime': time.time() - conn.connection_time,
'current_time': now,
'uptime': now - conn.connection_time,
'address': '{}:{}'.format(remote.host, remote.port),
'state': conn.state.state_name,
# 'received_bytes': conn.received_bytes,
'rtt': list(conn.state.rtt_window),
'last_message': time.time() - conn.last_message,
'last_message': now - conn.last_message,
'plugins': status,
'warning_flags': [flag.value for flag in conn.warning_flags],
'protocol_version': str(conn.sync_version),
Expand All @@ -82,6 +84,7 @@ def render_GET(self, request):
known_peers.append({
'id': peer.id,
'entrypoints': peer.entrypoints,
'last_seen': now - peer.last_seen,
'flags': [flag.value for flag in peer.flags],
})

Expand All @@ -103,7 +106,7 @@ def render_GET(self, request):
'app_version': app,
'state': self.manager.state.value,
'network': self.manager.network,
'uptime': time.time() - self.manager.start_time,
'uptime': now - self.manager.start_time,
'entrypoints': self.manager.connections.my_peer.entrypoints,
},
'peers_whitelist': self.manager.peers_whitelist,
Expand Down
37 changes: 23 additions & 14 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def __init__(self, protocol: 'HathorProtocol') -> None:
self.lc_ping = LoopingCall(self.send_ping_if_necessary)
self.lc_ping.clock = self.reactor

# LC to send GET_PEERS every once in a while.
self.lc_get_peers = LoopingCall(self.send_get_peers)
self.lc_get_peers.clock = self.reactor
self.get_peers_interval: int = 5 * 60 # Once every 5 minutes.

# Minimum interval between PING messages (in seconds).
self.ping_interval: int = 3

Expand Down Expand Up @@ -110,6 +115,8 @@ def on_enter(self) -> None:
self.protocol.on_peer_ready()

self.lc_ping.start(1, now=False)

self.lc_get_peers.start(self.get_peers_interval, now=False)
self.send_get_peers()

if self.lc_get_best_blockchain is not None:
Expand All @@ -121,6 +128,9 @@ def on_exit(self) -> None:
if self.lc_ping.running:
self.lc_ping.stop()

if self.lc_get_peers.running:
self.lc_get_peers.stop()

if self.lc_get_best_blockchain is not None and self.lc_get_best_blockchain.running:
self.lc_get_best_blockchain.stop()

Expand All @@ -146,22 +156,21 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
if self.protocol.connections:
self.send_peers(self.protocol.connections.iter_ready_connections())
for peer in self.protocol.connections.peer_storage.values():
self.send_peers([peer])

def send_peers(self, connections: Iterable['HathorProtocol']) -> None:
""" Send a PEERS command with a list of all connected peers.
def send_peers(self, peer_list: Iterable['PeerId']) -> None:
""" Send a PEERS command with a list of peers.
"""
peers = []
for conn in connections:
assert conn.peer is not None
peers.append({
'id': conn.peer.id,
'entrypoints': conn.peer.entrypoints,
'last_message': conn.last_message,
})
self.send_message(ProtocolMessages.PEERS, json_dumps(peers))
self.log.debug('send peers', peers=peers)
data = []
for peer in peer_list:
if peer.entrypoints:
data.append({
'id': peer.id,
'entrypoints': peer.entrypoints,
})
self.send_message(ProtocolMessages.PEERS, json_dumps(data))
self.log.debug('send peers', peers=data)

def handle_peers(self, payload: str) -> None:
""" Executed when a PEERS command is received. It updates the list
Expand Down
16 changes: 9 additions & 7 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,14 @@ def handle_get_tips(self, payload: str) -> None:

def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None:
"""Try to send a TIPS message. If rate limit has been reached, it schedules to send it later."""

# Filter for active delayed calls once one is executing
self._send_tips_call_later = [
call_later
for call_later in self._send_tips_call_later
if call_later.active()
]

if not self.global_rate_limiter.add_hit(self.GlobalRateLimiter.SEND_TIPS):
self.log.debug('send_tips throttled')
if len(self._send_tips_call_later) >= self._settings.MAX_GET_TIPS_DELAYED_CALLS:
Expand All @@ -635,18 +643,12 @@ def send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fals
)
)
return

self._send_tips(timestamp, include_hashes, offset)

def _send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = False, offset: int = 0) -> None:
""" Send a TIPS message.
"""
# Filter for active delayed calls once one is executing
self._send_tips_call_later = [
call_later
for call_later in self._send_tips_call_later
if call_later.active()
]

if timestamp is None:
timestamp = self.manager.tx_storage.latest_timestamp

Expand Down
18 changes: 10 additions & 8 deletions tests/p2p/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,18 @@ def test_two_connections(self):
self.assertAndStepConn(self.conn, b'^GET-TIPS')
self.assertAndStepConn(self.conn, b'^PING')

for _ in range(19):
for _ in range(20):
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN')

# peer1 should now send a PEERS with the new peer that just connected
self.assertAndStepConn(self.conn, b'^PEERS', b'^GET-BEST-BLOCKCHAIN')
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^TIPS')
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS')
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS-END')
self.assertAndStepConn(self.conn, b'^TIPS-END', b'^PONG')
self.assertAndStepConn(self.conn, b'^PONG', b'^BEST-BLOCKCHAIN')
self.assertAndStepConn(self.conn, b'^GET-PEERS', b'^GET-PEERS')
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^GET-BEST-BLOCKCHAIN')
self.assertAndStepConn(self.conn, b'^GET-PEERS', b'^GET-PEERS')
self.assertAndStepConn(self.conn, b'^PEERS', b'^GET-BEST-BLOCKCHAIN')
self.assertAndStepConn(self.conn, b'^GET-BEST-BLOCKCHAIN', b'^TIPS')
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS')
self.assertAndStepConn(self.conn, b'^TIPS', b'^TIPS-END')
self.assertAndStepConn(self.conn, b'^TIPS-END', b'^PONG')
self.assertAndStepConn(self.conn, b'^PONG', b'^BEST-BLOCKCHAIN')
self.assertIsConnected()

@inlineCallbacks
Expand Down