Skip to content

Commit 4bbb6b7

Browse files
committed
refactor(p2p): have two internal tiers for sync: available and enabled
1 parent 0b32a31 commit 4bbb6b7

File tree

8 files changed

+87
-58
lines changed

8 files changed

+87
-58
lines changed

hathor/builder/builder.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,10 @@ def _get_or_create_rocksdb_storage(self) -> RocksDBStorage:
334334
return self._rocksdb_storage
335335

336336
def _get_p2p_manager(self) -> ConnectionsManager:
337+
from hathor.p2p.sync_v1.factory import SyncV11Factory
338+
from hathor.p2p.sync_v2.factory import SyncV2Factory
339+
from hathor.p2p.sync_version import SyncVersion
340+
337341
enable_ssl = True
338342
reactor = self._get_reactor()
339343
my_peer = self._get_peer_id()
@@ -348,9 +352,13 @@ def _get_p2p_manager(self) -> ConnectionsManager:
348352
ssl=enable_ssl,
349353
whitelist_only=False,
350354
rng=self._rng,
351-
enable_sync_v1=self._enable_sync_v1,
352-
enable_sync_v2=self._enable_sync_v2,
353355
)
356+
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(p2p_manager))
357+
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(p2p_manager))
358+
if self._enable_sync_v1:
359+
p2p_manager.enable_sync_version(SyncVersion.V1_1)
360+
if self._enable_sync_v2:
361+
p2p_manager.enable_sync_version(SyncVersion.V2)
354362
return p2p_manager
355363

356364
def _get_or_create_indexes_manager(self) -> IndexesManager:

hathor/builder/cli_builder.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
6565
from hathor.event.websocket.factory import EventWebsocketFactory
6666
from hathor.p2p.netfilter.utils import add_peer_id_blacklist
6767
from hathor.p2p.peer_discovery import BootstrapPeerDiscovery, DNSPeerDiscovery
68+
from hathor.p2p.sync_v1.factory import SyncV11Factory
69+
from hathor.p2p.sync_v2.factory import SyncV2Factory
70+
from hathor.p2p.sync_version import SyncVersion
6871
from hathor.storage import RocksDBStorage
6972
from hathor.transaction.storage import (
7073
TransactionCacheStorage,
@@ -233,9 +236,13 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
233236
ssl=True,
234237
whitelist_only=False,
235238
rng=Random(),
236-
enable_sync_v1=enable_sync_v1,
237-
enable_sync_v2=enable_sync_v2,
238239
)
240+
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(p2p_manager))
241+
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(p2p_manager))
242+
if enable_sync_v1:
243+
p2p_manager.enable_sync_version(SyncVersion.V1_1)
244+
if enable_sync_v2:
245+
p2p_manager.enable_sync_version(SyncVersion.V2)
239246

240247
self.manager = HathorManager(
241248
reactor,

hathor/p2p/manager.py

+50-30
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class GlobalRateLimiter:
8686
handshaking_peers: set[HathorProtocol]
8787
whitelist_only: bool
8888
_sync_factories: dict[SyncVersion, SyncAgentFactory]
89+
_enabled_sync_versions: set[SyncVersion]
8990

9091
rate_limiter: RateLimiter
9192

@@ -96,15 +97,7 @@ def __init__(self,
9697
pubsub: PubSubManager,
9798
ssl: bool,
9899
rng: Random,
99-
whitelist_only: bool,
100-
enable_sync_v1: bool,
101-
enable_sync_v2: bool) -> None:
102-
from hathor.p2p.sync_v1.factory import SyncV11Factory
103-
from hathor.p2p.sync_v2.factory import SyncV2Factory
104-
105-
if not (enable_sync_v1 or enable_sync_v2):
106-
raise TypeError(f'{type(self).__name__}() at least one sync version is required')
107-
100+
whitelist_only: bool) -> None:
108101
self.log = logger.new()
109102
self.rng = rng
110103
self.manager = None
@@ -184,23 +177,57 @@ def __init__(self,
184177
# Parameter to explicitly enable whitelist-only mode, when False it will still check the whitelist for sync-v1
185178
self.whitelist_only = whitelist_only
186179

187-
self.enable_sync_v1 = enable_sync_v1
188-
self.enable_sync_v2 = enable_sync_v2
189-
190180
# Timestamp when the last discovery ran
191181
self._last_discovery: float = 0.
192182

193183
# sync-manager factories
194184
self._sync_factories = {}
195-
if enable_sync_v1:
196-
self._sync_factories[SyncVersion.V1_1] = SyncV11Factory(self)
197-
if enable_sync_v2:
198-
self._sync_factories[SyncVersion.V2] = SyncV2Factory(self)
185+
self._enabled_sync_versions = set()
186+
187+
def add_sync_factory(self, sync_version: SyncVersion, sync_factory: SyncAgentFactory) -> None:
188+
"""Add factory for the given sync version, must use a sync version that does not already exist."""
189+
# XXX: to allow code in `set_manager` to safely use the the available sync versions, we add this restriction:
190+
assert self.manager is None, 'Cannot modify sync factories after a manager is set'
191+
if sync_version in self._sync_factories:
192+
raise ValueError('sync version already exists')
193+
self._sync_factories[sync_version] = sync_factory
194+
195+
def get_available_sync_versions(self) -> set[SyncVersion]:
196+
"""What sync versions the manager is capable of using, they are not necessarily enabled."""
197+
return set(self._sync_factories.keys())
198+
199+
def is_sync_version_available(self, sync_version: SyncVersion) -> bool:
200+
"""Whether the given sync version is available for use, is not necessarily enabled."""
201+
return sync_version in self._sync_factories
202+
203+
def get_enabled_sync_versions(self) -> set[SyncVersion]:
204+
"""What sync versions are enabled for use, it is necessarily a subset of the available versions."""
205+
return self._enabled_sync_versions.copy()
206+
207+
def is_sync_version_enabled(self, sync_version: SyncVersion) -> bool:
208+
"""Whether the given sync version is enabled for use, being enabled implies being available."""
209+
return sync_version in self._enabled_sync_versions
210+
211+
def enable_sync_version(self, sync_version: SyncVersion) -> None:
212+
"""Enable using the given sync version on new connections, it must be available before being enabled."""
213+
assert sync_version in self._sync_factories
214+
if sync_version in self._enabled_sync_versions:
215+
self.log.info('tried to enable a sync verison that was already enabled, nothing to do')
216+
return
217+
self._enabled_sync_versions.add(sync_version)
218+
219+
def disable_sync_version(self, sync_version: SyncVersion) -> None:
220+
"""Disable using the given sync version, closes connections using this sync version."""
221+
if sync_version not in self._enabled_sync_versions:
222+
self.log.info('tried to disable a sync verison that was already disabled, nothing to do')
223+
return
224+
# TODO: close connections using the given sync version
225+
self._enabled_sync_versions.discard(sync_version)
199226

200227
def set_manager(self, manager: 'HathorManager') -> None:
201228
"""Set the manager. This method must be called before start()."""
202229
self.manager = manager
203-
if self.enable_sync_v2:
230+
if self.is_sync_version_available(SyncVersion.V2):
204231
assert self.manager.tx_storage.indexes is not None
205232
indexes = self.manager.tx_storage.indexes
206233
self.log.debug('enable sync-v2 indexes')
@@ -235,6 +262,10 @@ def enable_rate_limiter(self, max_hits: int = 16, window_seconds: float = 1) ->
235262
)
236263

237264
def start(self) -> None:
265+
"""Listen on the given address descriptions and start accepting and processing connections."""
266+
assert self.manager is not None, 'Cannot start without a manager'
267+
assert len(self._enabled_sync_versions) > 0, 'Cannot start without any sync version enabled'
268+
238269
self.lc_reconnect.start(5, now=False)
239270
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)
240271

@@ -278,20 +309,9 @@ def _get_peers_count(self) -> PeerConnectionsMetrics:
278309
len(self.peer_storage)
279310
)
280311

281-
def get_sync_versions(self) -> set[SyncVersion]:
282-
"""Set of versions that were enabled and are supported."""
283-
assert self.manager is not None
284-
if self.manager.has_sync_version_capability():
285-
return set(self._sync_factories.keys())
286-
else:
287-
assert SyncVersion.V1_1 in self._sync_factories, \
288-
'sync-versions capability disabled, but sync-v1 not enabled'
289-
# XXX: this is to make it easy to simulate old behavior if we disable the sync-version capability
290-
return {SyncVersion.V1_1}
291-
292312
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory:
293-
"""Get the sync factory for a given version, support MUST be checked beforehand or it will raise an assert."""
294-
assert sync_version in self._sync_factories, 'get_sync_factory must be called for a supported version'
313+
"""Get the sync factory for a given version, MUST be available or it will raise an assert."""
314+
assert sync_version in self._sync_factories, f'sync_version {sync_version} is not available'
295315
return self._sync_factories[sync_version]
296316

297317
def has_synced_peer(self) -> bool:

hathor/p2p/states/hello.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ def _get_hello_data(self) -> dict[str, Any]:
6464
return data
6565

6666
def _get_sync_versions(self) -> set[SyncVersion]:
67-
"""Shortcut to ConnectionManager.get_sync_versions"""
67+
"""Shortcut to ConnectionManager.get_enabled_sync_versions"""
6868
connections_manager = self.protocol.connections
6969
assert connections_manager is not None
70-
return connections_manager.get_sync_versions()
70+
return connections_manager.get_enabled_sync_versions()
7171

7272
def on_enter(self) -> None:
7373
# After a connection is made, we just send a HELLO message.

tests/others/test_cli_builder.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ def test_all_default(self):
5757
self.assertIsInstance(manager.tx_storage.indexes, RocksDBIndexesManager)
5858
self.assertIsNone(manager.wallet)
5959
self.assertEqual('unittests', manager.network)
60-
self.assertIn(SyncVersion.V1_1, manager.connections._sync_factories)
61-
self.assertNotIn(SyncVersion.V2, manager.connections._sync_factories)
60+
self.assertTrue(manager.connections.is_sync_version_enabled(SyncVersion.V1_1))
61+
self.assertFalse(manager.connections.is_sync_version_enabled(SyncVersion.V2))
6262
self.assertFalse(self.resources_builder._built_prometheus)
6363
self.assertFalse(self.resources_builder._built_status)
6464
self.assertFalse(manager._enable_event_queue)
@@ -103,13 +103,13 @@ def test_memory_storage_with_rocksdb_indexes(self):
103103

104104
def test_sync_bridge(self):
105105
manager = self._build(['--memory-storage', '--x-sync-bridge'])
106-
self.assertIn(SyncVersion.V1_1, manager.connections._sync_factories)
107-
self.assertIn(SyncVersion.V2, manager.connections._sync_factories)
106+
self.assertTrue(manager.connections.is_sync_version_enabled(SyncVersion.V1_1))
107+
self.assertTrue(manager.connections.is_sync_version_enabled(SyncVersion.V2))
108108

109109
def test_sync_v2_only(self):
110110
manager = self._build(['--memory-storage', '--x-sync-v2-only'])
111-
self.assertNotIn(SyncVersion.V1_1, manager.connections._sync_factories)
112-
self.assertIn(SyncVersion.V2, manager.connections._sync_factories)
111+
self.assertFalse(manager.connections.is_sync_version_enabled(SyncVersion.V1_1))
112+
self.assertTrue(manager.connections.is_sync_version_enabled(SyncVersion.V2))
113113

114114
def test_keypair_wallet(self):
115115
manager = self._build(['--memory-storage', '--wallet', 'keypair'])

tests/p2p/test_sync.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def test_downloader(self):
268268
self.assertTrue(isinstance(conn.proto1.state, PeerIdState))
269269
self.assertTrue(isinstance(conn.proto2.state, PeerIdState))
270270

271-
downloader = conn.proto2.connections._sync_factories[SyncVersion.V1_1].get_downloader()
271+
downloader = conn.proto2.connections.get_sync_factory(SyncVersion.V1_1).get_downloader()
272272

273273
node_sync1 = NodeSyncTimestamp(conn.proto1, downloader, reactor=conn.proto1.node.reactor)
274274
node_sync1.start()
@@ -361,7 +361,7 @@ def _downloader_bug_setup(self):
361361

362362
# create the peer that will experience the bug
363363
self.manager_bug = self.create_peer(self.network)
364-
self.downloader = self.manager_bug.connections._sync_factories[SyncVersion.V1_1].get_downloader()
364+
self.downloader = self.manager_bug.connections.get_sync_factory(SyncVersion.V1_1).get_downloader()
365365
self.downloader.window_size = 1
366366
self.conn1 = FakeConnection(self.manager_bug, self.manager1)
367367
self.conn2 = FakeConnection(self.manager_bug, self.manager2)

tests/p2p/test_whitelist.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ def test_sync_v11_whitelist_no_no(self):
1414
network = 'testnet'
1515

1616
manager1 = self.create_peer(network)
17-
self.assertEqual(set(manager1.connections._sync_factories.keys()), {SyncVersion.V1_1})
17+
self.assertEqual(manager1.connections.get_enabled_sync_versions(), {SyncVersion.V1_1})
1818

1919
manager2 = self.create_peer(network)
20-
self.assertEqual(set(manager2.connections._sync_factories.keys()), {SyncVersion.V1_1})
20+
self.assertEqual(manager2.connections.get_enabled_sync_versions(), {SyncVersion.V1_1})
2121

2222
conn = FakeConnection(manager1, manager2)
2323
self.assertFalse(conn.tr1.disconnecting)
@@ -36,10 +36,10 @@ def test_sync_v11_whitelist_yes_no(self):
3636
network = 'testnet'
3737

3838
manager1 = self.create_peer(network)
39-
self.assertEqual(set(manager1.connections._sync_factories.keys()), {SyncVersion.V1_1})
39+
self.assertEqual(manager1.connections.get_enabled_sync_versions(), {SyncVersion.V1_1})
4040

4141
manager2 = self.create_peer(network)
42-
self.assertEqual(set(manager2.connections._sync_factories.keys()), {SyncVersion.V1_1})
42+
self.assertEqual(manager2.connections.get_enabled_sync_versions(), {SyncVersion.V1_1})
4343

4444
manager1.peers_whitelist.append(manager2.my_peer.id)
4545

@@ -60,10 +60,10 @@ def test_sync_v11_whitelist_yes_yes(self):
6060
network = 'testnet'
6161

6262
manager1 = self.create_peer(network)
63-
self.assertEqual(set(manager1.connections._sync_factories.keys()), {SyncVersion.V1_1})
63+
self.assertEqual(manager1.connections.get_enabled_sync_versions(), {SyncVersion.V1_1})
6464

6565
manager2 = self.create_peer(network)
66-
self.assertEqual(set(manager2.connections._sync_factories.keys()), {SyncVersion.V1_1})
66+
self.assertEqual(manager2.connections.get_enabled_sync_versions(), {SyncVersion.V1_1})
6767

6868
manager1.peers_whitelist.append(manager2.my_peer.id)
6969
manager2.peers_whitelist.append(manager1.my_peer.id)

tests/unittest.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,8 @@ def create_peer(self, network, peer_id=None, wallet=None, tx_storage=None, unloc
248248
manager = self.create_peer_from_builder(builder, start_manager=start_manager)
249249

250250
# XXX: just making sure that tests set this up correctly
251-
if enable_sync_v2:
252-
assert SyncVersion.V2 in manager.connections._sync_factories
253-
else:
254-
assert SyncVersion.V2 not in manager.connections._sync_factories
255-
if enable_sync_v1:
256-
assert SyncVersion.V1_1 in manager.connections._sync_factories
257-
else:
258-
assert SyncVersion.V1_1 not in manager.connections._sync_factories
251+
assert manager.connections.is_sync_version_enabled(SyncVersion.V2) == enable_sync_v2
252+
assert manager.connections.is_sync_version_enabled(SyncVersion.V1_1) == enable_sync_v1
259253

260254
return manager
261255

0 commit comments

Comments
 (0)