Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Combine logic about not overriding BUSY presence. #16170

Merged
merged 7 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16170.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify presence code when using workers.
155 changes: 63 additions & 92 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,13 @@ def __init__(self, hs: "HomeServer"):

self._federation_queue = PresenceFederationQueue(hs, self)

self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

self.VALID_PRESENCE: Tuple[str, ...] = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)

if self._busy_presence_enabled:
if hs.config.experimental.msc3026_enabled:
self.VALID_PRESENCE += (PresenceState.BUSY,)

active_presence = self.store.take_presence_startup_info()
Expand Down Expand Up @@ -255,17 +253,19 @@ async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> None:
"""Set the presence state of the user.

Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
*not* overriding a previously set BUSY status, updating the
user's last_user_sync_ts, and ignoring the "status_msg" field of
the `state` dict.
"""

@abc.abstractmethod
Expand Down Expand Up @@ -491,23 +491,18 @@ async def user_syncing(
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

prev_state = await self.current_state_for_user(user_id)
if prev_state.state != PresenceState.BUSY:
# We set state here but pass ignore_status_msg = True as we don't want to
# cause the status message to be cleared.
# Note that this causes last_active_ts to be incremented which is not
# what the spec wants: see comment in the BasePresenceHandler version
# of this function.
await self.set_state(
UserID.from_string(user_id),
{"presence": presence_state},
ignore_status_msg=True,
)
# Note that this causes last_active_ts to be incremented which is not
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
state={"presence": presence_state},
is_sync=True,
)

curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1

# If we went from no in flight sync to some, notify replication
# If this is the first in-flight sync, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)

Expand All @@ -518,7 +513,7 @@ def _end() -> None:
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1

# If we went from one in flight sync to non, notify replication
# If there are no more in-flight syncs, notify replication
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)

Expand Down Expand Up @@ -598,17 +593,19 @@ async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> None:
"""Set the presence state of the user.

Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
*not* overriding a previously set BUSY status, updating the
user's last_user_sync_ts, and ignoring the "status_msg" field of
the `state` dict.
"""
presence = state["presence"]

Expand All @@ -626,8 +623,8 @@ async def set_state(
instance_name=self._presence_writer_instance,
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
is_sync=is_sync,
)

async def bump_presence_active_time(self, user: UserID) -> None:
Expand Down Expand Up @@ -992,45 +989,13 @@ async def user_syncing(
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1

prev_state = await self.current_state_for_user(user_id)

# If they're busy then they don't stop being busy just by syncing,
# so just update the last sync time.
if prev_state.state != PresenceState.BUSY:
# XXX: We set_state separately here and just update the last_active_ts above
# This keeps the logic as similar as possible between the worker and single
# process modes. Using set_state will actually cause last_active_ts to be
# updated always, which is not what the spec calls for, but synapse has done
# this for... forever, I think.
await self.set_state(
UserID.from_string(user_id),
{"presence": presence_state},
ignore_status_msg=True,
)
# Retrieve the new state for the logic below. This should come from the
# in-memory cache.
prev_state = await self.current_state_for_user(user_id)

# To keep the single process behaviour consistent with worker mode, run the
# same logic as `update_external_syncs_row`, even though it looks weird.
if prev_state.state == PresenceState.OFFLINE:
await self._update_states(
[
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
)
]
)
# otherwise, set the new presence state & update the last sync time,
# but don't update last_active_ts as this isn't an indication that
# they've been active (even though it's probably been updated by
# set_state above)
else:
await self._update_states(
[prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
)
# Note that this causes last_active_ts to be incremented which is not
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
state={"presence": presence_state},
is_sync=True,
)

async def _end() -> None:
try:
Expand Down Expand Up @@ -1080,32 +1045,27 @@ async def update_external_syncs_row(
process_id, set()
)

updates = []
# USER_SYNC is sent when a user starts or stops syncing on a remote
# process. (But only for the initial and last device.)
#
# When a user *starts* syncing it also calls set_state(...) which
# will update the state, last_active_ts, and last_user_sync_ts.
# Simply ensure the user is tracked as syncing in this case.
#
# When a user *stops* syncing, update the last_user_sync_ts and mark
# them as no longer syncing. Note this doesn't quite match the
# monolith behaviour, which updates last_user_sync_ts at the end of
# every sync, not just the last in-flight sync.
if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE:
updates.append(
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=sync_time_msec,
last_user_sync_ts=sync_time_msec,
)
)
else:
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
)
process_presence.add(user_id)
elif user_id in process_presence:
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
elif not is_syncing and user_id in process_presence:
new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec
)
await self._update_states([new_state])

if not is_syncing:
process_presence.discard(user_id)

if updates:
await self._update_states(updates)

self.external_process_last_updated_ms[process_id] = self.clock.time_msec()

async def update_external_syncs_clear(self, process_id: str) -> None:
Expand Down Expand Up @@ -1204,17 +1164,19 @@ async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> None:
"""Set the presence state of the user.

Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
*not* overriding a previously set BUSY status, updating the
user's last_user_sync_ts, and ignoring the "status_msg" field of
the `state` dict.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]
Expand All @@ -1227,18 +1189,27 @@ async def set_state(
return

user_id = target_user.to_string()
now = self.clock.time_msec()

prev_state = await self.current_state_for_user(user_id)

# Syncs do not override a previous presence of busy.
#
# TODO: This is a hack for lack of multi-device support. Unfortunately
# removing this requires coordination with clients.
if prev_state.state == PresenceState.BUSY and is_sync:
presence = PresenceState.BUSY

new_fields = {"state": presence}

if not ignore_status_msg:
new_fields["status_msg"] = status_msg
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
new_fields["last_active_ts"] = now

if presence == PresenceState.ONLINE or (
presence == PresenceState.BUSY and self._busy_presence_enabled
):
new_fields["last_active_ts"] = self.clock.time_msec()
if is_sync:
new_fields["last_user_sync_ts"] = now
else:
# Syncs do not override the status message.
new_fields["status_msg"] = status_msg

await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
Expand Down
10 changes: 5 additions & 5 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):

{
"state": { ... },
"ignore_status_msg": false,
"force_notify": false
"force_notify": false,
"is_sync": false
}

200 OK
Expand All @@ -96,13 +96,13 @@ def __init__(self, hs: "HomeServer"):
async def _serialize_payload( # type: ignore[override]
user_id: str,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> JsonDict:
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
"is_sync": is_sync,
}

async def _handle_request( # type: ignore[override]
Expand All @@ -111,8 +111,8 @@ async def _handle_request( # type: ignore[override]
await self._presence_handler.set_state(
UserID.from_string(user_id),
content["state"],
content["ignore_status_msg"],
content["force_notify"],
content.get("is_sync", False),
)

return (200, {})
Expand Down
37 changes: 30 additions & 7 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,13 +641,20 @@ def test_external_process_timeout(self) -> None:
"""Test that if an external process doesn't update the records for a while
we time out their syncing users presence.
"""
process_id = "1"

# Notify handler that a user is now syncing.
# Create a worker and use it to handle /sync traffic instead.
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)
worker_presence_handler = worker_to_sync_against.get_presence_handler()

self.get_success(
self.presence_handler.update_external_syncs_row(
process_id, self.user_id, True, self.clock.time_msec()
)
worker_presence_handler.user_syncing(
self.user_id, True, PresenceState.ONLINE
),
by=0.1,
)

# Check that if we wait a while without telling the handler the user has
Expand Down Expand Up @@ -820,7 +827,7 @@ def test_set_presence_from_syncing_keeps_busy(
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "presence_writer"}
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)

# Set presence to BUSY
Expand All @@ -832,14 +839,30 @@ def test_set_presence_from_syncing_keeps_busy(
self.get_success(
worker_to_sync_against.get_presence_handler().user_syncing(
self.user_id, True, PresenceState.ONLINE
)
),
by=0.1,
)

# Check against the main process that the user's presence did not change.
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should still be busy
self.assertEqual(state.state, PresenceState.BUSY)

# Advance such that the device would be discarded if it was not busy,
# then pump so _handle_timeouts function to called.
self.reactor.advance(IDLE_TIMER / 1000)
self.reactor.pump([5])

# The account should still be busy.
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.BUSY)

# Ensure that a /presence call can set the user *off* busy.
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)

state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.ONLINE)

def _set_presencestate_with_status_msg(
self, state: str, status_msg: Optional[str]
) -> None:
Expand Down