@@ -338,9 +338,7 @@ def has_synced_peer(self) -> bool:
338
338
"""
339
339
connections = list (self .iter_ready_connections ())
340
340
for conn in connections :
341
- assert conn .state is not None
342
- assert isinstance (conn .state , ReadyState )
343
- if conn .state .is_synced ():
341
+ if conn .is_synced ():
344
342
return True
345
343
return False
346
344
@@ -357,9 +355,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
357
355
connections = list (self .iter_ready_connections ())
358
356
self .rng .shuffle (connections )
359
357
for conn in connections :
360
- assert conn .state is not None
361
- assert isinstance (conn .state , ReadyState )
362
- conn .state .send_tx_to_peer (tx )
358
+ conn .send_tx_to_peer (tx )
363
359
364
360
def disconnect_all_peers (self , * , force : bool = False ) -> None :
365
361
"""Disconnect all peers."""
@@ -396,12 +392,10 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
396
392
397
393
def on_peer_ready (self , protocol : HathorProtocol ) -> None :
398
394
"""Called when a peer is ready."""
399
- assert protocol .peer is not None
400
- self .verified_peer_storage .add_or_replace (protocol .peer )
401
- assert protocol .peer .id is not None
402
-
395
+ protocol_peer = protocol .get_peer ()
396
+ self .verified_peer_storage .add_or_replace (protocol_peer )
403
397
self .handshaking_peers .remove (protocol )
404
- self .unverified_peer_storage .pop (protocol . peer .id , None )
398
+ self .unverified_peer_storage .pop (protocol_peer .id , None )
405
399
406
400
# we emit the event even if it's a duplicate peer as a matching
407
401
# NETWORK_PEER_DISCONNECTED will be emitted regardless
@@ -411,7 +405,7 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
411
405
peers_count = self ._get_peers_count ()
412
406
)
413
407
414
- if protocol . peer .id in self .connected_peers :
408
+ if protocol_peer .id in self .connected_peers :
415
409
# connected twice to same peer
416
410
self .log .warn ('duplicate connection to peer' , protocol = protocol )
417
411
conn = self .get_connection_to_drop (protocol )
@@ -420,35 +414,35 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
420
414
# the new connection is being dropped, so don't save it to connected_peers
421
415
return
422
416
423
- self .connected_peers [protocol . peer .id ] = protocol
417
+ self .connected_peers [protocol_peer .id ] = protocol
424
418
425
419
# In case it was a retry, we must reset the data only here, after it gets ready
426
- protocol . peer .info .reset_retry_timestamp ()
420
+ protocol_peer .info .reset_retry_timestamp ()
427
421
428
422
if len (self .connected_peers ) <= self .MAX_ENABLED_SYNC :
429
423
protocol .enable_sync ()
430
424
431
- if protocol . peer .id in self .always_enable_sync :
425
+ if protocol_peer .id in self .always_enable_sync :
432
426
protocol .enable_sync ()
433
427
434
428
# Notify other peers about this new peer connection.
435
- self .relay_peer_to_ready_connections (protocol . peer )
429
+ self .relay_peer_to_ready_connections (protocol_peer )
436
430
437
431
def relay_peer_to_ready_connections (self , peer : PublicPeer ) -> None :
438
432
"""Relay peer to all ready connections."""
439
433
for conn in self .iter_ready_connections ():
440
- if conn .peer == peer :
434
+ if conn .get_peer () == peer :
441
435
continue
442
- assert isinstance (conn .state , ReadyState )
443
- conn .state .send_peers ([peer ])
436
+ conn .send_peers ([peer ])
444
437
445
438
def on_peer_disconnect (self , protocol : HathorProtocol ) -> None :
446
439
"""Called when a peer disconnect."""
447
440
self .connections .discard (protocol )
448
441
if protocol in self .handshaking_peers :
449
442
self .handshaking_peers .remove (protocol )
450
- if protocol ._peer is not None :
451
- existing_protocol = self .connected_peers .pop (protocol .peer .id , None )
443
+ protocol_peer = protocol .get_peer_if_set ()
444
+ if protocol_peer is not None :
445
+ existing_protocol = self .connected_peers .pop (protocol_peer .id , None )
452
446
if existing_protocol is None :
453
447
# in this case, the connection was closed before it got to READY state
454
448
return
@@ -458,7 +452,7 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
458
452
# A check for duplicate connections is done during PEER_ID state, but there's still a
459
453
# chance it can happen if both connections start at the same time and none of them has
460
454
# reached READY state while the other is on PEER_ID state
461
- self .connected_peers [protocol . peer .id ] = existing_protocol
455
+ self .connected_peers [protocol_peer .id ] = existing_protocol
462
456
self .pubsub .publish (
463
457
HathorEvents .NETWORK_PEER_DISCONNECTED ,
464
458
protocol = protocol ,
@@ -480,8 +474,9 @@ def iter_not_ready_endpoints(self) -> Iterable[Entrypoint]:
480
474
for connecting_peer in self .connecting_peers .values ():
481
475
yield connecting_peer .entrypoint
482
476
for protocol in self .handshaking_peers :
483
- if protocol .entrypoint is not None :
484
- yield protocol .entrypoint
477
+ protocol_entrypoint = protocol .get_entrypoint ()
478
+ if protocol_entrypoint is not None :
479
+ yield protocol_entrypoint
485
480
else :
486
481
self .log .warn ('handshaking protocol has empty connection string' , protocol = protocol )
487
482
@@ -723,30 +718,28 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
723
718
We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2)
724
719
on the peer id string is used for this comparison.
725
720
"""
726
- assert protocol .peer is not None
727
- assert protocol .peer .id is not None
728
- assert protocol .my_peer .id is not None
729
- other_connection = self .connected_peers [protocol .peer .id ]
730
- if bytes (protocol .my_peer .id ) > bytes (protocol .peer .id ):
721
+ protocol_peer = protocol .get_peer ()
722
+ other_connection = self .connected_peers [protocol_peer .id ]
723
+ if bytes (self .my_peer .id ) > bytes (protocol_peer .id ):
731
724
# connection started by me is kept
732
- if not protocol .inbound :
725
+ if not protocol .is_inbound () :
733
726
# other connection is dropped
734
727
return other_connection
735
728
else :
736
729
# this was started by peer, so drop it
737
730
return protocol
738
731
else :
739
732
# connection started by peer is kept
740
- if not protocol .inbound :
733
+ if not protocol .is_inbound () :
741
734
return protocol
742
735
else :
743
736
return other_connection
744
737
745
738
def drop_connection (self , protocol : HathorProtocol ) -> None :
746
739
""" Drop a connection
747
740
"""
748
- assert protocol .peer is not None
749
- self .log .debug ('dropping connection' , peer_id = protocol . peer .id , protocol = type (protocol ).__name__ )
741
+ protocol_peer = protocol .get_peer ()
742
+ self .log .debug ('dropping connection' , peer_id = protocol_peer .id , protocol = type (protocol ).__name__ )
750
743
protocol .send_error_and_close_connection ('Connection droped' )
751
744
752
745
def drop_connection_by_peer_id (self , peer_id : PeerId ) -> None :
@@ -843,3 +836,17 @@ def reload_entrypoints_and_connections(self) -> None:
843
836
self .log .warn ('Killing all connections and resetting entrypoints...' )
844
837
self .disconnect_all_peers (force = True )
845
838
self .my_peer .reload_entrypoints_from_source_file ()
839
+
840
+ def get_peers_whitelist (self ) -> list [PeerId ]:
841
+ assert self .manager is not None
842
+ return self .manager .peers_whitelist
843
+
844
+ def get_verified_peers (self ) -> Iterable [PublicPeer ]:
845
+ return self .verified_peer_storage .values ()
846
+
847
+ def get_randbytes (self , n : int ) -> bytes :
848
+ return self .rng .randbytes (n )
849
+
850
+ def is_peer_whitelisted (self , peer_id : PeerId ) -> bool :
851
+ assert self .manager is not None
852
+ return peer_id in self .manager .peers_whitelist
0 commit comments