Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit cc66e18

Browse files
committed
Track presence state per-device and amalgamate to a user state.
Tracks presence on an individual per-device basis and combines the per-device state into a per-user state. This should help in situations where a user has two devices with conflicting status (e.g. one is syncing with unavailable and one is syncing with online). The tie-breaking is done by priority: BUSY > ONLINE > UNAVAILABLE > OFFLINE
1 parent d98a43d commit cc66e18

File tree

8 files changed

+148
-29
lines changed

8 files changed

+148
-29
lines changed

changelog.d/16066.bugfix

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix a long-standing bug where multi-device accounts could cause high load due to presence.
2+

synapse/api/presence.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@
2020
from synapse.types import JsonDict
2121

2222

23+
@attr.s(slots=True, auto_attribs=True)
24+
class UserDevicePresenceState:
25+
user_id: str
26+
device_id: Optional[str]
27+
state: str
28+
last_active_ts: int
29+
last_user_sync_ts: int
30+
status_msg: Optional[str]
31+
32+
2333
@attr.s(slots=True, frozen=True, auto_attribs=True)
2434
class UserPresenceState:
2535
"""Represents the current presence state of the user.

synapse/handlers/events.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ async def get_stream(
6767

6868
context = await presence_handler.user_syncing(
6969
requester.user.to_string(),
70+
requester.device_id,
7071
affect_presence=affect_presence,
7172
presence_state=PresenceState.ONLINE,
7273
)

synapse/handlers/presence.py

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import synapse.metrics
5050
from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
5151
from synapse.api.errors import SynapseError
52-
from synapse.api.presence import UserPresenceState
52+
from synapse.api.presence import UserDevicePresenceState, UserPresenceState
5353
from synapse.appservice import ApplicationService
5454
from synapse.events.presence_router import PresenceRouter
5555
from synapse.logging.context import run_in_background
@@ -150,11 +150,16 @@ def __init__(self, hs: "HomeServer"):
150150
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
151151

152152
active_presence = self.store.take_presence_startup_info()
153+
# The combine status across all user devices.
153154
self.user_to_current_state = {state.user_id: state for state in active_presence}
154155

155156
@abc.abstractmethod
156157
async def user_syncing(
157-
self, user_id: str, affect_presence: bool, presence_state: str
158+
self,
159+
user_id: str,
160+
device_id: Optional[str],
161+
affect_presence: bool,
162+
presence_state: str,
158163
) -> ContextManager[None]:
159164
"""Returns a context manager that should surround any stream requests
160165
from the user.
@@ -241,6 +246,7 @@ async def current_state_for_user(self, user_id: str) -> UserPresenceState:
241246
async def set_state(
242247
self,
243248
target_user: UserID,
249+
device_id: Optional[str],
244250
state: JsonDict,
245251
ignore_status_msg: bool = False,
246252
force_notify: bool = False,
@@ -368,7 +374,9 @@ async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
368374
# We set force_notify=True here so that this presence update is guaranteed to
369375
# increment the presence stream ID (which resending the current user's presence
370376
# otherwise would not do).
371-
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
377+
await self.set_state(
378+
UserID.from_string(user_id), None, state, force_notify=True
379+
)
372380

373381
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
374382
raise NotImplementedError(
@@ -472,7 +480,11 @@ def send_stop_syncing(self) -> None:
472480
self.send_user_sync(user_id, False, last_sync_ms)
473481

474482
async def user_syncing(
475-
self, user_id: str, affect_presence: bool, presence_state: str
483+
self,
484+
user_id: str,
485+
device_id: Optional[str],
486+
affect_presence: bool,
487+
presence_state: str,
476488
) -> ContextManager[None]:
477489
"""Record that a user is syncing.
478490
@@ -490,7 +502,10 @@ async def user_syncing(
490502
# what the spec wants: see comment in the BasePresenceHandler version
491503
# of this function.
492504
await self.set_state(
493-
UserID.from_string(user_id), {"presence": presence_state}, True
505+
UserID.from_string(user_id),
506+
device_id,
507+
{"presence": presence_state},
508+
True,
494509
)
495510

496511
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
@@ -586,6 +601,7 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
586601
async def set_state(
587602
self,
588603
target_user: UserID,
604+
device_id: Optional[str],
589605
state: JsonDict,
590606
ignore_status_msg: bool = False,
591607
force_notify: bool = False,
@@ -623,6 +639,7 @@ async def set_state(
623639
await self._set_state_client(
624640
instance_name=self._presence_writer_instance,
625641
user_id=user_id,
642+
device_id=device_id,
626643
state=state,
627644
ignore_status_msg=ignore_status_msg,
628645
force_notify=force_notify,
@@ -755,6 +772,11 @@ def run_persister() -> Awaitable[None]:
755772
self._event_pos = self.store.get_room_max_stream_ordering()
756773
self._event_processing = False
757774

775+
# The per-device presence state, maps user to devices to per-device presence state.
776+
self.user_to_device_to_current_state: Dict[
777+
str, Dict[Optional[str], UserDevicePresenceState]
778+
] = {}
779+
758780
async def _on_shutdown(self) -> None:
759781
"""Gets called when shutting down. This lets us persist any updates that
760782
we haven't yet persisted, e.g. updates that only changes some internal
@@ -973,6 +995,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:
973995
async def user_syncing(
974996
self,
975997
user_id: str,
998+
device_id: Optional[str],
976999
affect_presence: bool = True,
9771000
presence_state: str = PresenceState.ONLINE,
9781001
) -> ContextManager[None]:
@@ -985,6 +1008,7 @@ async def user_syncing(
9851008
9861009
Args:
9871010
user_id
1011+
device_id
9881012
affect_presence: If false this function will be a no-op.
9891013
Useful for streams that are not associated with an actual
9901014
client that is being used by a user.
@@ -1010,7 +1034,10 @@ async def user_syncing(
10101034
# updated always, which is not what the spec calls for, but synapse has done
10111035
# this for... forever, I think.
10121036
await self.set_state(
1013-
UserID.from_string(user_id), {"presence": presence_state}, True
1037+
UserID.from_string(user_id),
1038+
device_id,
1039+
{"presence": presence_state},
1040+
True,
10141041
)
10151042
# Retrieve the new state for the logic below. This should come from the
10161043
# in-memory cache.
@@ -1213,6 +1240,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
12131240
async def set_state(
12141241
self,
12151242
target_user: UserID,
1243+
device_id: Optional[str],
12161244
state: JsonDict,
12171245
ignore_status_msg: bool = False,
12181246
force_notify: bool = False,
@@ -1221,6 +1249,7 @@ async def set_state(
12211249
12221250
Args:
12231251
target_user: The ID of the user to set the presence state of.
1252+
device_id: The optional device ID.
12241253
state: The presence state as a JSON dictionary.
12251254
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
12261255
If False, the user's current status will be updated.
@@ -1249,6 +1278,42 @@ async def set_state(
12491278

12501279
prev_state = await self.current_state_for_user(user_id)
12511280

1281+
# Always update the device specific information.
1282+
# Get the previous device state.
1283+
device_state = self.user_to_device_to_current_state.setdefault(
1284+
user_id, {}
1285+
).setdefault(
1286+
device_id,
1287+
UserDevicePresenceState(
1288+
user_id,
1289+
device_id,
1290+
presence,
1291+
last_active_ts=self.clock.time_msec(),
1292+
last_user_sync_ts=self.clock.time_msec(),
1293+
status_msg=None,
1294+
),
1295+
)
1296+
device_state.state = presence
1297+
if presence:
1298+
device_state.status_msg = status_msg
1299+
device_state.last_active_ts = self.clock.time_msec()
1300+
device_state.last_user_sync_ts = self.clock.time_msec()
1301+
1302+
# Based on (all) the user's devices calculate the new presence state.
1303+
presence_by_priority = {
1304+
PresenceState.BUSY: 4,
1305+
PresenceState.ONLINE: 3,
1306+
PresenceState.UNAVAILABLE: 2,
1307+
PresenceState.OFFLINE: 1,
1308+
}
1309+
for device_state in self.user_to_device_to_current_state[user_id].values():
1310+
if (
1311+
presence_by_priority[device_state.state]
1312+
> presence_by_priority[presence]
1313+
):
1314+
presence = device_state.state
1315+
1316+
# The newly updated status as an amalgomation of all the device statuses.
12521317
new_fields = {"state": presence}
12531318

12541319
if not ignore_status_msg:
@@ -1962,6 +2027,7 @@ def handle_update(
19622027
# If the users are ours then we want to set up a bunch of timers
19632028
# to time things out.
19642029
if is_mine:
2030+
# TODO Maybe don't do this if currently active?
19652031
if new_state.state == PresenceState.ONLINE:
19662032
# Idle timer
19672033
wheel_timer.insert(

synapse/replication/http/presence.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import logging
16-
from typing import TYPE_CHECKING, Tuple
16+
from typing import TYPE_CHECKING, Optional, Tuple
1717

1818
from twisted.web.server import Request
1919

@@ -95,11 +95,13 @@ def __init__(self, hs: "HomeServer"):
9595
@staticmethod
9696
async def _serialize_payload( # type: ignore[override]
9797
user_id: str,
98+
device_id: Optional[str],
9899
state: JsonDict,
99100
ignore_status_msg: bool = False,
100101
force_notify: bool = False,
101102
) -> JsonDict:
102103
return {
104+
"device_id": device_id,
103105
"state": state,
104106
"ignore_status_msg": ignore_status_msg,
105107
"force_notify": force_notify,
@@ -110,6 +112,7 @@ async def _handle_request( # type: ignore[override]
110112
) -> Tuple[int, JsonDict]:
111113
await self._presence_handler.set_state(
112114
UserID.from_string(user_id),
115+
content["device_id"],
113116
content["state"],
114117
content["ignore_status_msg"],
115118
content["force_notify"],

synapse/rest/client/presence.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async def on_PUT(
9797
raise SynapseError(400, "Unable to parse state")
9898

9999
if self._use_presence:
100-
await self.presence_handler.set_state(user, state)
100+
await self.presence_handler.set_state(user, requester.device_id, state)
101101

102102
return 200, {}
103103

synapse/rest/client/sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
205205

206206
context = await self.presence_handler.user_syncing(
207207
user.to_string(),
208+
requester.device_id,
208209
affect_presence=affect_presence,
209210
presence_state=set_presence,
210211
)

0 commit comments

Comments
 (0)