16
16
17
17
from structlog import get_logger
18
18
from twisted .internet import endpoints
19
+ from twisted .internet .address import IPv4Address , IPv6Address
19
20
from twisted .internet .defer import Deferred
20
- from twisted .internet .interfaces import IProtocolFactory , IStreamClientEndpoint , IStreamServerEndpoint
21
+ from twisted .internet .interfaces import IListeningPort , IProtocolFactory , IStreamClientEndpoint
21
22
from twisted .internet .task import LoopingCall
22
23
from twisted .protocols .tls import TLSMemoryBIOFactory , TLSMemoryBIOProtocol
23
24
from twisted .python .failure import Failure
@@ -108,8 +109,11 @@ def __init__(self,
108
109
109
110
self .network = network
110
111
111
- # List of addresses to listen for new connections (eg: [tcp:8000])
112
- self .listen_addresses : list [str ] = []
112
+ # List of address descriptions to listen for new connections (eg: [tcp:8000])
113
+ self .listen_address_descriptions : list [str ] = []
114
+
115
+ # List of actual IP address instances to listen for new connections
116
+ self ._listen_addresses : list [IPv4Address | IPv6Address ] = []
113
117
114
118
# List of peer discovery methods.
115
119
self .peer_discoveries : list [PeerDiscovery ] = []
@@ -239,9 +243,9 @@ def set_manager(self, manager: 'HathorManager') -> None:
239
243
self .log .debug ('enable sync-v2 indexes' )
240
244
indexes .enable_mempool_index ()
241
245
242
- def add_listen_address (self , addr : str ) -> None :
246
+ def add_listen_address_description (self , addr : str ) -> None :
243
247
"""Add address to listen for incoming connections."""
244
- self .listen_addresses .append (addr )
248
+ self .listen_address_descriptions .append (addr )
245
249
246
250
def add_peer_discovery (self , peer_discovery : PeerDiscovery ) -> None :
247
251
"""Add a peer discovery method."""
@@ -279,7 +283,7 @@ def start(self) -> None:
279
283
if self ._settings .ENABLE_PEER_WHITELIST :
280
284
self ._start_whitelist_reconnect ()
281
285
282
- for description in self .listen_addresses :
286
+ for description in self .listen_address_descriptions :
283
287
self .listen (description )
284
288
285
289
self .do_discovery ()
@@ -635,7 +639,7 @@ def connect_to(self, description: str, peer: Optional[PeerId] = None, use_ssl: O
635
639
peers_count = self ._get_peers_count ()
636
640
)
637
641
638
- def listen (self , description : str , use_ssl : Optional [bool ] = None ) -> IStreamServerEndpoint :
642
+ def listen (self , description : str , use_ssl : Optional [bool ] = None ) -> None :
639
643
""" Start to listen for new connection according to the description.
640
644
641
645
If `ssl` is True, then the connection will be wraped by a TLS.
@@ -661,20 +665,43 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> IStreamSer
661
665
662
666
factory = NetfilterFactory (self , factory )
663
667
664
- self .log .info ('listen on' , endpoint = description )
665
- endpoint .listen (factory )
668
+ self .log .info ('trying to listen on' , endpoint = description )
669
+ deferred : Deferred [IListeningPort ] = endpoint .listen (factory )
670
+ deferred .addCallback (self ._on_listen_success , description )
671
+
672
+ def _on_listen_success (self , listening_port : IListeningPort , description : str ) -> None :
673
+ """Callback to be called when listening to an endpoint succeeds."""
674
+ self .log .info ('success listening on' , endpoint = description )
675
+ address = listening_port .getHost ()
676
+
677
+ if not isinstance (address , (IPv4Address , IPv6Address )):
678
+ self .log .error (f'unhandled address type for endpoint "{ description } ": { str (type (address ))} ' )
679
+ return
680
+
681
+ self ._listen_addresses .append (address )
666
682
667
- # XXX: endpoint: IStreamServerEndpoint does not intrinsically have a port, but in practice all concrete cases
668
- # that we have will have a _port attribute
669
- port = getattr (endpoint , '_port' , None )
670
683
assert self .manager is not None
671
- if self .manager .hostname and port is not None :
672
- proto , _ , _ = description .partition (':' )
673
- address = '{}://{}:{}' .format (proto , self .manager .hostname , port )
674
- assert self .manager .my_peer is not None
675
- self .manager .my_peer .entrypoints .append (address )
684
+ if self .manager .hostname :
685
+ self ._add_hostname_entrypoint (self .manager .hostname , address )
676
686
677
- return endpoint
687
+ def update_hostname_entrypoints (self , * , old_hostname : str | None , new_hostname : str ) -> None :
688
+ """Add new hostname entrypoints according to the listen addresses, and remove any old entrypoint."""
689
+ assert self .manager is not None
690
+ for address in self ._listen_addresses :
691
+ if old_hostname is not None :
692
+ old_address_str = self ._get_hostname_address_str (old_hostname , address )
693
+ if old_address_str in self .my_peer .entrypoints :
694
+ self .my_peer .entrypoints .remove (old_address_str )
695
+
696
+ self ._add_hostname_entrypoint (new_hostname , address )
697
+
698
+ def _add_hostname_entrypoint (self , hostname : str , address : IPv4Address | IPv6Address ) -> None :
699
+ hostname_address_str = self ._get_hostname_address_str (hostname , address )
700
+ self .my_peer .entrypoints .append (hostname_address_str )
701
+
702
+ @staticmethod
703
+ def _get_hostname_address_str (hostname : str , address : IPv4Address | IPv6Address ) -> str :
704
+ return '{}://{}:{}' .format (address .type , hostname , address .port ).lower ()
678
705
679
706
def get_connection_to_drop (self , protocol : HathorProtocol ) -> HathorProtocol :
680
707
""" When there are duplicate connections, determine which one should be dropped.
@@ -796,3 +823,9 @@ def _sync_rotate_if_needed(self, *, force: bool = False) -> None:
796
823
797
824
for peer_id in info .to_enable :
798
825
self .connected_peers [peer_id ].enable_sync ()
826
+
827
+ def reload_entrypoints_and_connections (self ) -> None :
828
+ """Kill all connections and reload entrypoints from the original peer config file."""
829
+ self .log .warn ('Killing all connections and resetting entrypoints...' )
830
+ self .disconnect_all_peers (force = True )
831
+ self .my_peer .reload_entrypoints_from_source_file ()
0 commit comments