12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ from collections import deque
15
16
from typing import TYPE_CHECKING , Any , Iterable , NamedTuple , Optional
16
17
17
18
from structlog import get_logger
30
31
from hathor .p2p .peer_discovery import PeerDiscovery
31
32
from hathor .p2p .peer_endpoint import PeerAddress , PeerEndpoint
32
33
from hathor .p2p .peer_id import PeerId
33
- from hathor .p2p .peer_storage import UnverifiedPeerStorage , VerifiedPeerStorage
34
+ from hathor .p2p .peer_storage import VerifiedPeerStorage
34
35
from hathor .p2p .protocol import HathorProtocol
35
36
from hathor .p2p .rate_limiter import RateLimiter
36
37
from hathor .p2p .states .ready import ReadyState
@@ -81,10 +82,10 @@ class GlobalRateLimiter:
81
82
manager : Optional ['HathorManager' ]
82
83
connections : set [HathorProtocol ]
83
84
connected_peers : dict [PeerId , HathorProtocol ]
85
+ new_connection_from_queue : deque [PeerId ]
84
86
connecting_peers : dict [IStreamClientEndpoint , _ConnectingPeer ]
85
87
handshaking_peers : set [HathorProtocol ]
86
88
whitelist_only : bool
87
- unverified_peer_storage : UnverifiedPeerStorage
88
89
verified_peer_storage : VerifiedPeerStorage
89
90
_sync_factories : dict [SyncVersion , SyncAgentFactory ]
90
91
_enabled_sync_versions : set [SyncVersion ]
@@ -156,12 +157,12 @@ def __init__(
156
157
# List of peers connected and ready to communicate.
157
158
self .connected_peers = {}
158
159
159
- # List of peers received from the network.
160
- # We cannot trust their identity before we connect to them.
161
- self .unverified_peer_storage = UnverifiedPeerStorage ()
160
+ # Queue of ready peer-id's used by connect_to_peer_from_connection_queue to choose the next peer to pull a
161
+ # random new connection from
162
+ self .new_connection_from_queue = deque ()
162
163
163
164
# List of known peers.
164
- self .verified_peer_storage = VerifiedPeerStorage () # dict[string (peer.id), PublicPeer]
165
+ self .verified_peer_storage = VerifiedPeerStorage (rng = self . rng , max_size = self . _settings . MAX_VERIFIED_PEERS )
165
166
166
167
# Maximum unseen time before removing a peer (seconds).
167
168
self .max_peer_unseen_dt : float = 30 * 60 # 30-minutes
@@ -181,6 +182,11 @@ def __init__(
181
182
# Timestamp of the last time sync was updated.
182
183
self ._last_sync_rotate : float = 0.
183
184
185
+ # Connect to new peers in a timed loop, instead of as soon as possible
186
+ self .lc_connect = LoopingCall (self .connect_to_peer_from_connection_queue )
187
+ self .lc_connect .clock = self .reactor
188
+ self .lc_connect_interval = 0.2 # seconds
189
+
184
190
# A timer to try to reconnect to the disconnect known peers.
185
191
if self ._settings .ENABLE_PEER_WHITELIST :
186
192
self .wl_reconnect = LoopingCall (self .update_whitelist )
@@ -272,7 +278,7 @@ def do_discovery(self) -> None:
272
278
Do a discovery and connect on all discovery strategies.
273
279
"""
274
280
for peer_discovery in self .peer_discoveries :
275
- coro = peer_discovery .discover_and_connect (self .connect_to )
281
+ coro = peer_discovery .discover_and_connect (self .connect_to_endpoint )
276
282
Deferred .fromCoroutine (coro )
277
283
278
284
def disable_rate_limiter (self ) -> None :
@@ -293,6 +299,7 @@ def start(self) -> None:
293
299
if self .manager is None :
294
300
raise TypeError ('Class was built incorrectly without a HathorManager.' )
295
301
302
+ self ._start_peer_connect_loop ()
296
303
self .lc_reconnect .start (5 , now = False )
297
304
self .lc_sync_update .start (self .lc_sync_update_interval , now = False )
298
305
@@ -319,7 +326,28 @@ def _handle_whitelist_reconnect_err(self, *args: Any, **kwargs: Any) -> None:
319
326
self .log .error ('whitelist reconnect had an exception. Start looping call again.' , args = args , kwargs = kwargs )
320
327
self .reactor .callLater (30 , self ._start_whitelist_reconnect )
321
328
329
+ def _start_peer_connect_loop (self ) -> None :
330
+ # The deferred returned by the LoopingCall start method
331
+ # executes when the looping call stops running
332
+ # https://docs.twistedmatrix.com/en/stable/api/twisted.internet.task.LoopingCall.html
333
+ d = self .lc_connect .start (self .lc_connect_interval , now = True )
334
+ d .addErrback (self ._handle_peer_connect_err )
335
+
336
+ def _handle_peer_connect_err (self , * args : Any , ** kwargs : Any ) -> None :
337
+ # This method will be called when an exception happens inside the peer connect loop
338
+ # and ends up stopping the looping call.
339
+ # We log the error and start the looping call again.
340
+ self .log .error (
341
+ 'connect_to_peer_from_connection_queue had an exception. Start looping call again.' ,
342
+ args = args ,
343
+ kwargs = kwargs ,
344
+ )
345
+ self .reactor .callLater (self .lc_connect_interval , self ._start_peer_connect_loop )
346
+
322
347
def stop (self ) -> None :
348
+ if self .lc_connect .running :
349
+ self .lc_connect .stop ()
350
+
323
351
if self .lc_reconnect .running :
324
352
self .lc_reconnect .stop ()
325
353
@@ -406,10 +434,10 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
406
434
"""Called when a peer is ready."""
407
435
assert protocol .peer is not None
408
436
self .verified_peer_storage .add_or_replace (protocol .peer )
409
- assert protocol .peer .id is not None
410
437
411
438
self .handshaking_peers .remove (protocol )
412
- self .unverified_peer_storage .pop (protocol .peer .id , None )
439
+ for conn in self .iter_all_connections ():
440
+ conn .unverified_peer_storage .remove (protocol .peer )
413
441
414
442
# we emit the event even if it's a duplicate peer as a matching
415
443
# NETWORK_PEER_DISCONNECTED will be emitted regardless
@@ -419,7 +447,8 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
419
447
peers_count = self ._get_peers_count ()
420
448
)
421
449
422
- if protocol .peer .id in self .connected_peers :
450
+ peer_id = protocol .peer .id
451
+ if peer_id in self .connected_peers :
423
452
# connected twice to same peer
424
453
self .log .warn ('duplicate connection to peer' , protocol = protocol )
425
454
conn = self .get_connection_to_drop (protocol )
@@ -428,15 +457,19 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
428
457
# the new connection is being dropped, so don't save it to connected_peers
429
458
return
430
459
431
- self .connected_peers [protocol .peer .id ] = protocol
460
+ self .connected_peers [peer_id ] = protocol
461
+ if peer_id not in self .new_connection_from_queue :
462
+ self .new_connection_from_queue .append (peer_id )
463
+ else :
464
+ self .log .warn ('peer already in queue' , peer = str (peer_id ))
432
465
433
466
# In case it was a retry, we must reset the data only here, after it gets ready
434
467
protocol .peer .info .reset_retry_timestamp ()
435
468
436
469
if len (self .connected_peers ) <= self .MAX_ENABLED_SYNC :
437
470
protocol .enable_sync ()
438
471
439
- if protocol . peer . id in self .always_enable_sync :
472
+ if peer_id in self .always_enable_sync :
440
473
protocol .enable_sync ()
441
474
442
475
# Notify other peers about this new peer connection.
@@ -456,7 +489,8 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
456
489
if protocol in self .handshaking_peers :
457
490
self .handshaking_peers .remove (protocol )
458
491
if protocol ._peer is not None :
459
- existing_protocol = self .connected_peers .pop (protocol .peer .id , None )
492
+ peer_id = protocol .peer .id
493
+ existing_protocol = self .connected_peers .pop (peer_id , None )
460
494
if existing_protocol is None :
461
495
# in this case, the connection was closed before it got to READY state
462
496
return
@@ -466,7 +500,10 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
466
500
# A check for duplicate connections is done during PEER_ID state, but there's still a
467
501
# chance it can happen if both connections start at the same time and none of them has
468
502
# reached READY state while the other is on PEER_ID state
469
- self .connected_peers [protocol .peer .id ] = existing_protocol
503
+ self .connected_peers [peer_id ] = existing_protocol
504
+ elif peer_id in self .new_connection_from_queue :
505
+ # now we're sure it can be removed from new_connection_from_queue
506
+ self .new_connection_from_queue .remove (peer_id )
470
507
self .pubsub .publish (
471
508
HathorEvents .NETWORK_PEER_DISCONNECTED ,
472
509
protocol = protocol ,
@@ -499,15 +536,6 @@ def is_peer_connected(self, peer_id: PeerId) -> bool:
499
536
"""
500
537
return peer_id in self .connected_peers
501
538
502
- def on_receive_peer (self , peer : UnverifiedPeer , origin : Optional [ReadyState ] = None ) -> None :
503
- """ Update a peer information in our storage, and instantly attempt to connect
504
- to it if it is not connected yet.
505
- """
506
- if peer .id == self .my_peer .id :
507
- return
508
- peer = self .unverified_peer_storage .add_or_merge (peer )
509
- self .connect_to_if_not_connected (peer , int (self .reactor .seconds ()))
510
-
511
539
def peers_cleanup (self ) -> None :
512
540
"""Clean up aged peers."""
513
541
now = self .reactor .seconds ()
@@ -523,11 +551,45 @@ def peers_cleanup(self) -> None:
523
551
for remove_peer in to_be_removed :
524
552
self .verified_peer_storage .remove (remove_peer )
525
553
526
- def reconnect_to_all (self ) -> None :
527
- """ It is called by the `lc_reconnect` timer and tries to connect to all known
528
- peers.
554
+ def connect_to_peer_from_connection_queue (self ) -> None :
555
+ """ It is called by the `lc_connect` looping call and tries to connect to a new peer.
556
+ """
557
+ if not self .new_connection_from_queue :
558
+ self .log .debug ('connection queue is empty' )
559
+ return
560
+ assert self .manager is not None
561
+ self .log .debug ('connect to peer from connection queue' )
562
+ candidate_new_peers : list [UnverifiedPeer ]
563
+ # we don't know if we will find a candidate, so we can't do `while True:`
564
+ for _ in range (len (self .new_connection_from_queue )):
565
+ # for a deque([1, 2, 3, 4]) this will get 1 and modify it to deque([2, 3, 4, 1])
566
+ next_from_peer_id = self .new_connection_from_queue [0 ]
567
+ self .new_connection_from_queue .rotate (- 1 )
568
+
569
+ protocol = self .connected_peers .get (next_from_peer_id )
570
+ if protocol is None :
571
+ self .log .error ('expected protocol not found' , peer_id = str (next_from_peer_id ))
572
+ assert self .new_connection_from_queue .pop () == next_from_peer_id
573
+ continue
574
+ candidate_new_peers = [
575
+ candidate_peer
576
+ for candidate_peer_id , candidate_peer in protocol .unverified_peer_storage .items ()
577
+ if candidate_peer_id not in self .connected_peers or candidate_peer_id not in self .connecting_peers
578
+ ]
579
+ if candidate_new_peers :
580
+ break
581
+ else :
582
+ self .log .debug ('no new peers in the connection queue' )
583
+ # this means we rotated through the whole queue and did not find any candidate
584
+ return
529
585
530
- TODO(epnichols): Should we always connect to *all*? Should there be a max #?
586
+ peer = self .rng .choice (candidate_new_peers )
587
+ self .log .debug ('random peer chosen' , peer = str (peer .id ), entrypoints = peer .info .entrypoints_as_str ())
588
+ now = self .reactor .seconds ()
589
+ self .connect_to_peer (peer , int (now ))
590
+
591
+ def reconnect_to_all (self ) -> None :
592
+ """ It is called by the `lc_reconnect` timer and tries to connect to all known peers.
531
593
"""
532
594
self .peers_cleanup ()
533
595
# when we have no connected peers left, run the discovery process again
@@ -536,10 +598,10 @@ def reconnect_to_all(self) -> None:
536
598
if now - self ._last_discovery >= self .PEER_DISCOVERY_INTERVAL :
537
599
self ._last_discovery = now
538
600
self .do_discovery ()
539
- # We need to use list() here because the dict might change inside connect_to_if_not_connected
601
+ # We need to use list() here because the dict might change inside connect_to_peer
540
602
# when the peer is disconnected and without entrypoint
541
603
for peer in list (self .verified_peer_storage .values ()):
542
- self .connect_to_if_not_connected (peer , int (now ))
604
+ self .connect_to_peer (peer , int (now ))
543
605
544
606
def update_whitelist (self ) -> Deferred [None ]:
545
607
from twisted .web .client import readBody
@@ -582,7 +644,7 @@ def _update_whitelist_cb(self, body: bytes) -> None:
582
644
for peer_id in peers_to_remove :
583
645
self .manager .remove_peer_from_whitelist_and_disconnect (peer_id )
584
646
585
- def connect_to_if_not_connected (self , peer : UnverifiedPeer | PublicPeer , now : int ) -> None :
647
+ def connect_to_peer (self , peer : UnverifiedPeer | PublicPeer , now : int ) -> None :
586
648
""" Attempts to connect if it is not connected to the peer.
587
649
"""
588
650
if not peer .info .entrypoints or (
@@ -602,15 +664,16 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in
602
664
assert peer .id is not None
603
665
if peer .info .can_retry (now ):
604
666
if self .enable_ipv6 and not self .disable_ipv4 :
605
- addr = self .rng .choice (peer .info .entrypoints )
667
+ addr = self .rng .choice (list ( peer .info .entrypoints ) )
606
668
elif self .enable_ipv6 and self .disable_ipv4 :
607
669
addr = self .rng .choice (peer .info .get_ipv6_only_entrypoints ())
608
670
elif not self .enable_ipv6 and not self .disable_ipv4 :
609
671
addr = self .rng .choice (peer .info .get_ipv4_only_entrypoints ())
610
672
else :
611
673
raise ValueError ('IPv4 is disabled and IPv6 is not enabled' )
612
-
613
- self .connect_to (addr .with_id (peer .id ), peer )
674
+ self .connect_to_endpoint (addr .with_id (peer .id ), peer )
675
+ else :
676
+ self .log .debug ('connecting too often, skip retrying' , peer = str (peer .id ))
614
677
615
678
def _connect_to_callback (
616
679
self ,
@@ -628,14 +691,17 @@ def _connect_to_callback(
628
691
protocol .wrappedProtocol .on_outbound_connect (entrypoint , peer )
629
692
self .connecting_peers .pop (endpoint )
630
693
631
- def connect_to (
694
+ def connect_to_endpoint (
632
695
self ,
633
696
entrypoint : PeerEndpoint ,
634
697
peer : UnverifiedPeer | PublicPeer | None = None ,
635
698
use_ssl : bool | None = None ,
636
699
) -> None :
637
- """ Attempt to connect to a peer, even if a connection already exists.
638
- Usually you should call `connect_to_if_not_connected`.
700
+ """ Attempt to connect directly to an endpoint, prefer calling `connect_to_peer` when possible.
701
+
702
+ This method does not take into account the peer's id (since we might not even know it, or have verified it even
703
+ if we know). But this method will check if there's already a connection open to the given endpoint and skip it
704
+ if there is one.
639
705
640
706
If `use_ssl` is True, then the connection will be wraped by a TLS.
641
707
"""
@@ -747,7 +813,7 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname:
747
813
748
814
def _add_hostname_entrypoint (self , hostname : str , address : IPv4Address | IPv6Address ) -> None :
749
815
hostname_entrypoint = PeerAddress .from_hostname_address (hostname , address )
750
- self .my_peer .info .entrypoints .append (hostname_entrypoint )
816
+ self .my_peer .info .entrypoints .add (hostname_entrypoint )
751
817
752
818
def get_connection_to_drop (self , protocol : HathorProtocol ) -> HathorProtocol :
753
819
""" When there are duplicate connections, determine which one should be dropped.
0 commit comments