23
23
"""
24
24
import abc
25
25
import contextlib
26
+ import itertools
26
27
import logging
27
28
from bisect import bisect
28
29
from contextlib import contextmanager
@@ -188,15 +189,17 @@ async def user_syncing(
188
189
"""
189
190
190
191
@abc .abstractmethod
191
- def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
192
- """Get an iterable of syncing users on this worker, to send to the presence handler
192
+ def get_currently_syncing_users_for_replication (
193
+ self ,
194
+ ) -> Iterable [Tuple [str , Optional [str ]]]:
195
+ """Get an iterable of syncing users and devices on this worker, to send to the presence handler
193
196
194
197
This is called when a replication connection is established. It should return
195
- a list of user ids , which are then sent as USER_SYNC commands to inform the
196
- process handling presence about those users.
198
+ a list of tuples of user ID & device ID , which are then sent as USER_SYNC commands
199
+ to inform the process handling presence about those users/devices .
197
200
198
201
Returns:
199
- An iterable of user_id strings .
202
+ An iterable of tuples of user ID and device ID .
200
203
"""
201
204
202
205
async def get_state (self , target_user : UserID ) -> UserPresenceState :
@@ -284,7 +287,12 @@ async def bump_presence_active_time(
284
287
"""
285
288
286
289
async def update_external_syncs_row ( # noqa: B027 (no-op by design)
287
- self , process_id : str , user_id : str , is_syncing : bool , sync_time_msec : int
290
+ self ,
291
+ process_id : str ,
292
+ user_id : str ,
293
+ device_id : Optional [str ],
294
+ is_syncing : bool ,
295
+ sync_time_msec : int ,
288
296
) -> None :
289
297
"""Update the syncing users for an external process as a delta.
290
298
@@ -295,6 +303,7 @@ async def update_external_syncs_row( # noqa: B027 (no-op by design)
295
303
syncing against. This allows synapse to process updates
296
304
as user start and stop syncing against a given process.
297
305
user_id: The user who has started or stopped syncing
306
+ device_id: The user's device that has started or stopped syncing
298
307
is_syncing: Whether or not the user is now syncing
299
308
sync_time_msec: Time in ms when the user was last syncing
300
309
"""
@@ -425,16 +434,18 @@ def __init__(self, hs: "HomeServer"):
425
434
hs .config .worker .writers .presence ,
426
435
)
427
436
428
- # The number of ongoing syncs on this process, by user id .
437
+ # The number of ongoing syncs on this process, by ( user ID, device ID) .
429
438
# Empty if _presence_enabled is false.
430
- self ._user_to_num_current_syncs : Dict [str , int ] = {}
439
+ self ._user_device_to_num_current_syncs : Dict [
440
+ Tuple [str , Optional [str ]], int
441
+ ] = {}
431
442
432
443
self .notifier = hs .get_notifier ()
433
444
self .instance_id = hs .get_instance_id ()
434
445
435
- # user_id -> last_sync_ms. Lists the users that have stopped syncing but
436
- # we haven't notified the presence writer of that yet
437
- self .users_going_offline : Dict [str , int ] = {}
446
+ # ( user_id, device_id) -> last_sync_ms. Lists the devices that have stopped
447
+ # syncing but we haven't notified the presence writer of that yet
448
+ self ._user_devices_going_offline : Dict [Tuple [ str , Optional [ str ]] , int ] = {}
438
449
439
450
self ._bump_active_client = ReplicationBumpPresenceActiveTime .make_client (hs )
440
451
self ._set_state_client = ReplicationPresenceSetState .make_client (hs )
@@ -457,39 +468,47 @@ async def _on_shutdown(self) -> None:
457
468
ClearUserSyncsCommand (self .instance_id )
458
469
)
459
470
460
- def send_user_sync (self , user_id : str , is_syncing : bool , last_sync_ms : int ) -> None :
471
+ def send_user_sync (
472
+ self ,
473
+ user_id : str ,
474
+ device_id : Optional [str ],
475
+ is_syncing : bool ,
476
+ last_sync_ms : int ,
477
+ ) -> None :
461
478
if self ._presence_enabled :
462
479
self .hs .get_replication_command_handler ().send_user_sync (
463
- self .instance_id , user_id , is_syncing , last_sync_ms
480
+ self .instance_id , user_id , device_id , is_syncing , last_sync_ms
464
481
)
465
482
466
- def mark_as_coming_online (self , user_id : str ) -> None :
483
+ def mark_as_coming_online (self , user_id : str , device_id : Optional [ str ] ) -> None :
467
484
"""A user has started syncing. Send a UserSync to the presence writer,
468
485
unless they had recently stopped syncing.
469
486
"""
470
- going_offline = self .users_going_offline .pop (user_id , None )
487
+ going_offline = self ._user_devices_going_offline .pop (( user_id , device_id ) , None )
471
488
if not going_offline :
472
489
# Safe to skip because we haven't yet told the presence writer they
473
490
# were offline
474
- self .send_user_sync (user_id , True , self .clock .time_msec ())
491
+ self .send_user_sync (user_id , device_id , True , self .clock .time_msec ())
475
492
476
- def mark_as_going_offline (self , user_id : str ) -> None :
493
+ def mark_as_going_offline (self , user_id : str , device_id : Optional [ str ] ) -> None :
477
494
"""A user has stopped syncing. We wait before notifying the presence
478
495
writer as its likely they'll come back soon. This allows us to avoid
479
496
sending a stopped syncing immediately followed by a started syncing
480
497
notification to the presence writer
481
498
"""
482
- self .users_going_offline [ user_id ] = self .clock .time_msec ()
499
+ self ._user_devices_going_offline [( user_id , device_id ) ] = self .clock .time_msec ()
483
500
484
501
def send_stop_syncing (self ) -> None :
485
502
"""Check if there are any users who have stopped syncing a while ago and
486
503
haven't come back yet. If there are poke the presence writer about them.
487
504
"""
488
505
now = self .clock .time_msec ()
489
- for user_id , last_sync_ms in list (self .users_going_offline .items ()):
506
+ for (user_id , device_id ), last_sync_ms in list (
507
+ self ._user_devices_going_offline .items ()
508
+ ):
490
509
if now - last_sync_ms > UPDATE_SYNCING_USERS_MS :
491
- self .users_going_offline .pop (user_id , None )
492
- self .send_user_sync (user_id , False , last_sync_ms )
510
+ self ._user_devices_going_offline .pop (( user_id , device_id ) , None )
511
+ self .send_user_sync (user_id , device_id , False , last_sync_ms )
493
512
494
513
async def user_syncing (
495
514
self ,
@@ -515,23 +534,23 @@ async def user_syncing(
515
534
is_sync = True ,
516
535
)
517
536
518
- curr_sync = self ._user_to_num_current_syncs .get (user_id , 0 )
519
- self ._user_to_num_current_syncs [ user_id ] = curr_sync + 1
537
+ curr_sync = self ._user_device_to_num_current_syncs .get (( user_id , device_id ) , 0 )
538
+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] = curr_sync + 1
520
539
521
540
# If this is the first in-flight sync, notify replication
522
- if self ._user_to_num_current_syncs [ user_id ] == 1 :
523
- self .mark_as_coming_online (user_id )
541
+ if self ._user_device_to_num_current_syncs [( user_id , device_id ) ] == 1 :
542
+ self .mark_as_coming_online (user_id , device_id )
524
543
525
544
def _end () -> None :
526
545
# We check that the user_id is in user_to_num_current_syncs because
527
546
# user_to_num_current_syncs may have been cleared if we are
528
547
# shutting down.
529
- if user_id in self ._user_to_num_current_syncs :
530
- self ._user_to_num_current_syncs [ user_id ] -= 1
548
+ if ( user_id , device_id ) in self ._user_device_to_num_current_syncs :
549
+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] -= 1
531
550
532
551
# If there are no more in-flight syncs, notify replication
533
- if self ._user_to_num_current_syncs [ user_id ] == 0 :
534
- self .mark_as_going_offline (user_id )
552
+ if self ._user_device_to_num_current_syncs [( user_id , device_id ) ] == 0 :
553
+ self .mark_as_going_offline (user_id , device_id )
535
554
536
555
@contextlib .contextmanager
537
556
def _user_syncing () -> Generator [None , None , None ]:
@@ -598,10 +617,12 @@ async def process_replication_rows(
598
617
# If this is a federation sender, notify about presence updates.
599
618
await self .maybe_send_presence_to_interested_destinations (state_to_notify )
600
619
601
- def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
620
+ def get_currently_syncing_users_for_replication (
621
+ self ,
622
+ ) -> Iterable [Tuple [str , Optional [str ]]]:
602
623
return [
603
- user_id
604
- for user_id , count in self ._user_to_num_current_syncs .items ()
624
+ user_id_device_id
625
+ for user_id_device_id , count in self ._user_device_to_num_current_syncs .items ()
605
626
if count > 0
606
627
]
607
628
@@ -723,17 +744,23 @@ def __init__(self, hs: "HomeServer"):
723
744
724
745
# Keeps track of the number of *ongoing* syncs on this process. While
725
746
# this is non zero a user will never go offline.
726
- self .user_to_num_current_syncs : Dict [str , int ] = {}
747
+ self ._user_device_to_num_current_syncs : Dict [
748
+ Tuple [str , Optional [str ]], int
749
+ ] = {}
727
750
728
751
# Keeps track of the number of *ongoing* syncs on other processes.
752
+ #
729
753
# While any sync is ongoing on another process the user will never
730
754
# go offline.
755
+ #
731
756
# Each process has a unique identifier and an update frequency. If
732
757
# no update is received from that process within the update period then
733
758
# we assume that all the sync requests on that process have stopped.
734
- # Stored as a dict from process_id to set of user_id, and a dict of
735
- # process_id to millisecond timestamp last updated.
736
- self .external_process_to_current_syncs : Dict [str , Set [str ]] = {}
759
+ # Stored as a dict from process_id to set of (user_id, device_id), and
760
+ # a dict of process_id to millisecond timestamp last updated.
761
+ self .external_process_to_current_syncs : Dict [
762
+ str , Set [Tuple [str , Optional [str ]]]
763
+ ] = {}
737
764
self .external_process_last_updated_ms : Dict [str , int ] = {}
738
765
739
766
self .external_sync_linearizer = Linearizer (name = "external_sync_linearizer" )
@@ -938,7 +965,10 @@ async def _handle_timeouts(self) -> None:
938
965
# that were syncing on that process to see if they need to be timed
939
966
# out.
940
967
users_to_check .update (
941
- self .external_process_to_current_syncs .pop (process_id , ())
968
+ user_id
969
+ for user_id , device_id in self .external_process_to_current_syncs .pop (
970
+ process_id , ()
971
+ )
942
972
)
943
973
self .external_process_last_updated_ms .pop (process_id )
944
974
@@ -951,11 +981,15 @@ async def _handle_timeouts(self) -> None:
951
981
952
982
syncing_user_ids = {
953
983
user_id
954
- for user_id , count in self .user_to_num_current_syncs .items ()
984
+ for ( user_id , _ ), count in self ._user_device_to_num_current_syncs .items ()
955
985
if count
956
986
}
957
- for user_ids in self .external_process_to_current_syncs .values ():
958
- syncing_user_ids .update (user_ids )
987
+ syncing_user_ids .update (
988
+ user_id
989
+ for user_id , _ in itertools .chain (
990
+ * self .external_process_to_current_syncs .values ()
991
+ )
992
+ )
959
993
960
994
changes = handle_timeouts (
961
995
states ,
@@ -1013,8 +1047,8 @@ async def user_syncing(
1013
1047
if not affect_presence or not self ._presence_enabled :
1014
1048
return _NullContextManager ()
1015
1049
1016
- curr_sync = self .user_to_num_current_syncs .get (user_id , 0 )
1017
- self .user_to_num_current_syncs [ user_id ] = curr_sync + 1
1050
+ curr_sync = self ._user_device_to_num_current_syncs .get (( user_id , device_id ) , 0 )
1051
+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] = curr_sync + 1
1018
1052
1019
1053
# Note that this causes last_active_ts to be incremented which is not
1020
1054
# what the spec wants.
@@ -1027,7 +1061,7 @@ async def user_syncing(
1027
1061
1028
1062
async def _end () -> None :
1029
1063
try :
1030
- self .user_to_num_current_syncs [ user_id ] -= 1
1064
+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] -= 1
1031
1065
1032
1066
prev_state = await self .current_state_for_user (user_id )
1033
1067
await self ._update_states (
@@ -1049,12 +1083,19 @@ def _user_syncing() -> Generator[None, None, None]:
1049
1083
1050
1084
return _user_syncing ()
1051
1085
1052
- def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
1086
+ def get_currently_syncing_users_for_replication (
1087
+ self ,
1088
+ ) -> Iterable [Tuple [str , Optional [str ]]]:
1053
1089
# since we are the process handling presence, there is nothing to do here.
1054
1090
return []
1055
1091
1056
1092
async def update_external_syncs_row (
1057
- self , process_id : str , user_id : str , is_syncing : bool , sync_time_msec : int
1093
+ self ,
1094
+ process_id : str ,
1095
+ user_id : str ,
1096
+ device_id : Optional [str ],
1097
+ is_syncing : bool ,
1098
+ sync_time_msec : int ,
1058
1099
) -> None :
1059
1100
"""Update the syncing users for an external process as a delta.
1060
1101
@@ -1063,6 +1104,7 @@ async def update_external_syncs_row(
1063
1104
syncing against. This allows synapse to process updates
1064
1105
as user start and stop syncing against a given process.
1065
1106
user_id: The user who has started or stopped syncing
1107
+ device_id: The user's device that has started or stopped syncing
1066
1108
is_syncing: Whether or not the user is now syncing
1067
1109
sync_time_msec: Time in ms when the user was last syncing
1068
1110
"""
@@ -1073,26 +1115,27 @@ async def update_external_syncs_row(
1073
1115
process_id , set ()
1074
1116
)
1075
1117
1076
- # USER_SYNC is sent when a user starts or stops syncing on a remote
1077
- # process. (But only for the initial and last device.)
1118
+ # USER_SYNC is sent when a user's device starts or stops syncing on
1119
+ # a remote # process. (But only for the initial and last sync for that
1120
+ # device.)
1078
1121
#
1079
- # When a user *starts* syncing it also calls set_state(...) which
1122
+ # When a device *starts* syncing it also calls set_state(...) which
1080
1123
# will update the state, last_active_ts, and last_user_sync_ts.
1081
- # Simply ensure the user is tracked as syncing in this case.
1124
+ # Simply ensure the user & device is tracked as syncing in this case.
1082
1125
#
1083
- # When a user *stops* syncing, update the last_user_sync_ts and mark
1126
+ # When a device *stops* syncing, update the last_user_sync_ts and mark
1084
1127
# them as no longer syncing. Note this doesn't quite match the
1085
1128
# monolith behaviour, which updates last_user_sync_ts at the end of
1086
1129
# every sync, not just the last in-flight sync.
1087
- if is_syncing and user_id not in process_presence :
1088
- process_presence .add (user_id )
1089
- elif not is_syncing and user_id in process_presence :
1130
+ if is_syncing and ( user_id , device_id ) not in process_presence :
1131
+ process_presence .add (( user_id , device_id ) )
1132
+ elif not is_syncing and ( user_id , device_id ) in process_presence :
1090
1133
new_state = prev_state .copy_and_replace (
1091
1134
last_user_sync_ts = sync_time_msec
1092
1135
)
1093
1136
await self ._update_states ([new_state ])
1094
1137
1095
- process_presence .discard (user_id )
1138
+ process_presence .discard (( user_id , device_id ) )
1096
1139
1097
1140
self .external_process_last_updated_ms [process_id ] = self .clock .time_msec ()
1098
1141
@@ -1106,7 +1149,9 @@ async def update_external_syncs_clear(self, process_id: str) -> None:
1106
1149
process_presence = self .external_process_to_current_syncs .pop (
1107
1150
process_id , set ()
1108
1151
)
1109
- prev_states = await self .current_state_for_users (process_presence )
1152
+ prev_states = await self .current_state_for_users (
1153
+ {user_id for user_id , device_id in process_presence }
1154
+ )
1110
1155
time_now_ms = self .clock .time_msec ()
1111
1156
1112
1157
await self ._update_states (
0 commit comments