diff --git a/changelog.d/15091.bugfix b/changelog.d/15091.bugfix new file mode 100644 index 000000000000..12f979e9d041 --- /dev/null +++ b/changelog.d/15091.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change. \ No newline at end of file diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 2c9cbf8b275b..29a5ef4311da 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -283,6 +283,13 @@ def should_send_federation(self) -> bool: def get_replication_notifier(self) -> ReplicationNotifier: return ReplicationNotifier() + def get_user_directory_handler(self) -> object: + class FakeUserDirectoryHandler: + def kick_off_remote_profile_refresh_process(self) -> None: + pass + + return FakeUserDirectoryHandler() + class Porter: def __init__( diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 32f52e54d8c7..df229fc0a374 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -422,6 +422,21 @@ class RemoteServerUpCommand(_SimpleCommand): NAME = "REMOTE_SERVER_UP" +class ReadyToRefreshStaleUserDirectoryProfilesCommand(_SimpleCommand): + """ + Sent when a worker needs to tell the user directory worker that there are + stale remote user profiles that require refreshing. + + Triggered when the user directory background update has been completed. + + Format:: + + USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES '' + """ + + NAME = "USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES" + + _COMMANDS: Tuple[Type[Command], ...] = ( ServerCommand, RdataCommand, @@ -435,6 +450,7 @@ class RemoteServerUpCommand(_SimpleCommand): UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + ReadyToRefreshStaleUserDirectoryProfilesCommand, ) # Map of command name to command type. @@ -448,6 +464,7 @@ class RemoteServerUpCommand(_SimpleCommand): ErrorCommand.NAME, PingCommand.NAME, RemoteServerUpCommand.NAME, + ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME, ) # The commands the client is allowed to send @@ -461,6 +478,7 @@ class RemoteServerUpCommand(_SimpleCommand): UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, + ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME, ) diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 93a78db81186..a36bb1f8f13a 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -138,10 +138,15 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: "populate_user_directory_process_rooms", ), ( - "populate_user_directory_cleanup", + "populate_user_directory_process_remote_users", "{}", "populate_user_directory_process_users", ), + ( + "populate_user_directory_cleanup", + "{}", + "populate_user_directory_process_remote_users", + ), ] else: raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 97f09b73dde6..276d4fcec9f4 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -27,6 +27,10 @@ cast, ) +from synapse.replication.tcp.commands import ( + ReadyToRefreshStaleUserDirectoryProfilesCommand, +) + try: # Figure out if ICU support is available for searching users. import icu @@ -91,17 +95,32 @@ def __init__( ) self.db_pool.updates.register_background_update_handler( "populate_user_directory_process_users", - self._populate_user_directory_process_users, + self._populate_user_directory_process_local_users, + ) + self.db_pool.updates.register_background_update_handler( + "populate_user_directory_process_remote_users", + self._populate_user_directory_process_remote_users, ) self.db_pool.updates.register_background_update_handler( "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) + @staticmethod + def _delete_staging_area(txn: LoggingTransaction) -> None: + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") + txn.execute( + "DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup" + ) + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + async def _populate_user_directory_createtables( self, progress: JsonDict, batch_size: int ) -> int: - # Get all the rooms that we want to process. def _make_staging_area(txn: LoggingTransaction) -> None: + # Clear out any tables if they already exist beforehand. + UserDirectoryBackgroundUpdateStore._delete_staging_area(txn) + sql = ( "CREATE TABLE IF NOT EXISTS " + TEMP_TABLE @@ -142,6 +161,18 @@ def _make_staging_area(txn: LoggingTransaction) -> None: txn, TEMP_TABLE + "_users", keys=("user_id",), values=users ) + # A table for storing a list of remote users that *may* need a remote + # lookup in order to obtain a public profile. + # The list should be compared against the user directory's cache + # to see whether any queries can be skipped because the remote user + # also appeared in a public room. + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_remote_users_needing_lookup(user_id TEXT PRIMARY KEY NOT NULL)" + ) + txn.execute(sql) + new_pos = await self.get_max_stream_id_in_current_state_deltas() await self.db_pool.runInteraction( "populate_user_directory_temp_build", _make_staging_area @@ -168,13 +199,9 @@ async def _populate_user_directory_cleanup( ) await self.update_user_directory_stream_pos(position) - def _delete_staging_area(txn: LoggingTransaction) -> None: - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - await self.db_pool.runInteraction( - "populate_user_directory_cleanup", _delete_staging_area + "populate_user_directory_cleanup", + UserDirectoryBackgroundUpdateStore._delete_staging_area, ) await self.db_pool.updates._end_background_update( @@ -262,10 +289,17 @@ def _get_next_batch( or await self.should_include_local_user_in_dir(user_id) } + # Determine whether the room is public + is_public = await self.is_room_world_readable_or_publicly_joinable( + room_id + ) + + remote_users_to_query_later = set() + # Upsert a user_directory record for each remote user we see. for user_id, profile in users_with_profile.items(): # Local users are processed separately in - # `_populate_user_directory_users`; there we can read from + # `_populate_user_directory_local_users`; there we can read from # the `profiles` table to ensure we don't leak their per-room # profiles. It also means we write local users to this table # exactly once, rather than once for every room they're in. @@ -274,14 +308,29 @@ def _get_next_batch( # TODO `users_with_profile` above reads from the `user_directory` # table, meaning that `profile` is bespoke to this room. # and this leaks remote users' per-room profiles to the user directory. - await self.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) + if is_public: + # If this is a public room, it's acceptable to add the profile + # into the user directory. + await self.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) + else: + # Otherwise query the user at a later time + remote_users_to_query_later.add(user_id) + + # (insert the remote users needing a query in batch; + # use upsert with no values for 'INSERT OR IGNORE' semantics) + await self.db_pool.simple_upsert_many( + f"{TEMP_TABLE}_remote_users_needing_lookup", + ("user_id",), + [(u,) for u in remote_users_to_query_later], + (), + (), + desc="populate_user_directory_queue_remote_needing_lookup", + ) + del remote_users_to_query_later # Now update the room sharing tables to include this room. - is_public = await self.is_room_world_readable_or_publicly_joinable( - room_id - ) if is_public: if users_with_profile: await self.add_users_in_public_rooms( @@ -336,7 +385,7 @@ def _get_next_batch( return processed_event_count - async def _populate_user_directory_process_users( + async def _populate_user_directory_process_local_users( self, progress: JsonDict, batch_size: int ) -> int: """ @@ -404,6 +453,114 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: return len(users_to_work_on) + async def _populate_user_directory_process_remote_users( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Sorts through the `_remote_users_needing_lookup` table and adds the + users within to the list of stale remote profiles, + unless we already populated a user directory entry for them (i.e. they were + also in a public room). + """ + + def _get_next_batch_txn( + txn: LoggingTransaction, done_up_to_user_id: str + ) -> Optional[str]: + """ + Given the last user ID we've processed, + Returns + - a user ID to process up to and including; or + - `None` if there is no limit left (i.e. we should just process all + remaining rows). + """ + # Should be a B-Tree index only scan: so reasonably efficient despite the + # OFFSET + # If we're lucky, will also warm up the disk cache for the subsequent query + # that actually does some work. + txn.execute( + f""" + SELECT user_id + FROM {TEMP_TABLE}_remote_users_needing_lookup + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET ? + """, + (done_up_to_user_id, batch_size), + ) + row = txn.fetchone() + if row: + return row[0] + else: + return None + + def _add_private_only_users_to_stale_profile_refresh_queue_txn( + txn: LoggingTransaction, from_exc: str, until_inc: Optional[str] + ) -> None: + end_condition = "AND user_id <= ?" if until_inc is not None else "" + end_args = (until_inc,) if until_inc is not None else () + + user_id_serverpart: str + if isinstance(self.database_engine, PostgresEngine): + user_id_serverpart = ( + "SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)" + else: + raise RuntimeError("Unknown database engine!") + + txn.execute( + f""" + INSERT INTO user_directory_stale_remote_users + (user_id, next_try_at_ts, retry_counter, user_server_name) + SELECT + user_id, 0, 0, {user_id_serverpart} + FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl + LEFT JOIN user_directory AS ud USING (user_id) + WHERE ud.user_id IS NULL + AND ? < user_id {end_condition} + """, + (from_exc,) + end_args, + ) + + def _do_txn(txn: LoggingTransaction) -> None: + """ + Does a step of background update. + """ + last_user_id = progress.get("last_user_id", "@") + next_end_limit_inc = _get_next_batch_txn(txn, last_user_id) + _add_private_only_users_to_stale_profile_refresh_queue_txn( + txn, last_user_id, next_end_limit_inc + ) + + # Update the progress + progress["last_user_id"] = next_end_limit_inc + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_remote_users", progress + ) + + if progress.get("last_user_id", "@") is None: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_remote_users" + ) + + # Now kick off querying remote homeservers for profile information. + if self.hs.config.worker.should_update_user_directory: + self.hs.get_user_directory_handler().kick_off_remote_profile_refresh_process() + else: + command_handler = self.hs.get_replication_command_handler() + command_handler.send_command( + ReadyToRefreshStaleUserDirectoryProfilesCommand("") + ) + + return 1 + + await self.db_pool.runInteraction( + "populate_user_directory_process_remote_users", + _do_txn, + ) + return batch_size + async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. Is this user one of them? diff --git a/synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql b/synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql new file mode 100644 index 000000000000..8c6da8219b48 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql @@ -0,0 +1,40 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Rebuild the user directory in light of the fix for leaking the per-room +-- profiles of remote users to the user directory. + +-- First cancel any existing rebuilds if already pending; we'll run from fresh. +DELETE FROM background_updates WHERE update_name IN ( + 'populate_user_directory_createtables', + 'populate_user_directory_process_rooms', + 'populate_user_directory_process_users', + 'populate_user_directory_process_remote_users', + 'populate_user_directory_cleanup' +); + +-- Then schedule the steps. +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + -- Set up user directory staging tables. + (7402, 'populate_user_directory_createtables', '{}', NULL), + -- Run through each room and update the user directory according to who is in it. + (7402, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'), + -- Insert all users into the user directory, if search_all_users is on. + (7402, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'), + -- Insert remote users into the queue for fetching. + (7402, 'populate_user_directory_process_remote_users', '{}', 'populate_user_directory_process_users'), + -- Clean up user directory staging tables. + (7402, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_remote_users') +ON CONFLICT (update_name) DO NOTHING; diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index da4d24082648..872c95ffb0f1 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -1128,9 +1128,10 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.user_dir_handler = hs.get_user_directory_handler() self.profile_handler = hs.get_profile_handler() - # Cancel the startup call: in the steady-state case we can't rely on it anyway. - assert self.user_dir_handler._refresh_remote_profiles_call_later is not None - self.user_dir_handler._refresh_remote_profiles_call_later.cancel() + if self.user_dir_handler._refresh_remote_profiles_call_later is not None: + # Cancel the startup call: in the steady-state case we can't rely on + # it anyway. + self.user_dir_handler._refresh_remote_profiles_call_later.cancel() def test_public_rooms_have_profiles_collected(self) -> None: """