27
27
28
28
from synapse .api .constants import EduTypes
29
29
from synapse .api .errors import CodeMessageException , Codes , NotFoundError , SynapseError
30
+ from synapse .handlers .device import DeviceHandler
30
31
from synapse .logging .context import make_deferred_yieldable , run_in_background
31
32
from synapse .logging .opentracing import log_kv , set_tag , tag_args , trace
32
- from synapse .replication .http .devices import ReplicationUserDevicesResyncRestServlet
33
33
from synapse .types import (
34
34
JsonDict ,
35
35
UserID ,
@@ -56,27 +56,23 @@ def __init__(self, hs: "HomeServer"):
56
56
self .is_mine = hs .is_mine
57
57
self .clock = hs .get_clock ()
58
58
59
- self ._edu_updater = SigningKeyEduUpdater (hs , self )
60
-
61
59
federation_registry = hs .get_federation_registry ()
62
60
63
- self ._is_master = hs .config .worker .worker_app is None
64
- if not self ._is_master :
65
- self ._user_device_resync_client = (
66
- ReplicationUserDevicesResyncRestServlet .make_client (hs )
67
- )
68
- else :
61
+ is_master = hs .config .worker .worker_app is None
62
+ if is_master :
63
+ edu_updater = SigningKeyEduUpdater (hs )
64
+
69
65
# Only register this edu handler on master as it requires writing
70
66
# device updates to the db
71
67
federation_registry .register_edu_handler (
72
68
EduTypes .SIGNING_KEY_UPDATE ,
73
- self . _edu_updater .incoming_signing_key_update ,
69
+ edu_updater .incoming_signing_key_update ,
74
70
)
75
71
# also handle the unstable version
76
72
# FIXME: remove this when enough servers have upgraded
77
73
federation_registry .register_edu_handler (
78
74
EduTypes .UNSTABLE_SIGNING_KEY_UPDATE ,
79
- self . _edu_updater .incoming_signing_key_update ,
75
+ edu_updater .incoming_signing_key_update ,
80
76
)
81
77
82
78
# doesn't really work as part of the generic query API, because the
@@ -319,14 +315,13 @@ async def _query_devices_for_destination(
319
315
# probably be tracking their device lists. However, we haven't
320
316
# done an initial sync on the device list so we do it now.
321
317
try :
322
- if self . _is_master :
323
- resync_results = await self .device_handler .device_list_updater .user_device_resync (
318
+ resync_results = (
319
+ await self .device_handler .device_list_updater .user_device_resync (
324
320
user_id
325
321
)
326
- else :
327
- resync_results = await self ._user_device_resync_client (
328
- user_id = user_id
329
- )
322
+ )
323
+ if resync_results is None :
324
+ raise ValueError ("Device resync failed" )
330
325
331
326
# Add the device keys to the results.
332
327
user_devices = resync_results ["devices" ]
@@ -605,6 +600,8 @@ async def claim_client_keys(destination: str) -> None:
605
600
async def upload_keys_for_user (
606
601
self , user_id : str , device_id : str , keys : JsonDict
607
602
) -> JsonDict :
603
+ # This can only be called from the main process.
604
+ assert isinstance (self .device_handler , DeviceHandler )
608
605
609
606
time_now = self .clock .time_msec ()
610
607
@@ -732,6 +729,8 @@ async def upload_signing_keys_for_user(
732
729
user_id: the user uploading the keys
733
730
keys: the signing keys
734
731
"""
732
+ # This can only be called from the main process.
733
+ assert isinstance (self .device_handler , DeviceHandler )
735
734
736
735
# if a master key is uploaded, then check it. Otherwise, load the
737
736
# stored master key, to check signatures on other keys
@@ -823,6 +822,9 @@ async def upload_signatures_for_device_keys(
823
822
Raises:
824
823
SynapseError: if the signatures dict is not valid.
825
824
"""
825
+ # This can only be called from the main process.
826
+ assert isinstance (self .device_handler , DeviceHandler )
827
+
826
828
failures = {}
827
829
828
830
# signatures to be stored. Each item will be a SignatureListItem
@@ -1200,6 +1202,9 @@ async def _retrieve_cross_signing_keys_for_remote_user(
1200
1202
A tuple of the retrieved key content, the key's ID and the matching VerifyKey.
1201
1203
If the key cannot be retrieved, all values in the tuple will instead be None.
1202
1204
"""
1205
+ # This can only be called from the main process.
1206
+ assert isinstance (self .device_handler , DeviceHandler )
1207
+
1203
1208
try :
1204
1209
remote_result = await self .federation .query_user_devices (
1205
1210
user .domain , user .to_string ()
@@ -1396,11 +1401,14 @@ class SignatureListItem:
1396
1401
class SigningKeyEduUpdater :
1397
1402
"""Handles incoming signing key updates from federation and updates the DB"""
1398
1403
1399
- def __init__ (self , hs : "HomeServer" , e2e_keys_handler : E2eKeysHandler ):
1404
+ def __init__ (self , hs : "HomeServer" ):
1400
1405
self .store = hs .get_datastores ().main
1401
1406
self .federation = hs .get_federation_client ()
1402
1407
self .clock = hs .get_clock ()
1403
- self .e2e_keys_handler = e2e_keys_handler
1408
+
1409
+ device_handler = hs .get_device_handler ()
1410
+ assert isinstance (device_handler , DeviceHandler )
1411
+ self ._device_handler = device_handler
1404
1412
1405
1413
self ._remote_edu_linearizer = Linearizer (name = "remote_signing_key" )
1406
1414
@@ -1445,9 +1453,6 @@ async def _handle_signing_key_updates(self, user_id: str) -> None:
1445
1453
user_id: the user whose updates we are processing
1446
1454
"""
1447
1455
1448
- device_handler = self .e2e_keys_handler .device_handler
1449
- device_list_updater = device_handler .device_list_updater
1450
-
1451
1456
async with self ._remote_edu_linearizer .queue (user_id ):
1452
1457
pending_updates = self ._pending_updates .pop (user_id , [])
1453
1458
if not pending_updates :
@@ -1459,13 +1464,11 @@ async def _handle_signing_key_updates(self, user_id: str) -> None:
1459
1464
logger .info ("pending updates: %r" , pending_updates )
1460
1465
1461
1466
for master_key , self_signing_key in pending_updates :
1462
- new_device_ids = (
1463
- await device_list_updater .process_cross_signing_key_update (
1464
- user_id ,
1465
- master_key ,
1466
- self_signing_key ,
1467
- )
1467
+ new_device_ids = await self ._device_handler .device_list_updater .process_cross_signing_key_update (
1468
+ user_id ,
1469
+ master_key ,
1470
+ self_signing_key ,
1468
1471
)
1469
1472
device_ids = device_ids + new_device_ids
1470
1473
1471
- await device_handler .notify_device_update (user_id , device_ids )
1474
+ await self . _device_handler .notify_device_update (user_id , device_ids )
0 commit comments