Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

When populating the user directory, query remote servers for user profiles instead of leaking the profiles in private rooms. [rei:userdirpriv] #15091

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15091.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
7 changes: 7 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ def should_send_federation(self) -> bool:
def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()

def get_user_directory_handler(self) -> object:
class FakeUserDirectoryHandler:
def kick_off_remote_profile_refresh_process(self) -> None:
pass

return FakeUserDirectoryHandler()


class Porter:
def __init__(
Expand Down
18 changes: 18 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,21 @@ class RemoteServerUpCommand(_SimpleCommand):
NAME = "REMOTE_SERVER_UP"


class ReadyToRefreshStaleUserDirectoryProfilesCommand(_SimpleCommand):
"""
Sent when a worker needs to tell the user directory worker that there are
stale remote user profiles that require refreshing.

Triggered when the user directory background update has been completed.

Format::

USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES ''
"""

NAME = "USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES"


_COMMANDS: Tuple[Type[Command], ...] = (
ServerCommand,
RdataCommand,
Expand All @@ -435,6 +450,7 @@ class RemoteServerUpCommand(_SimpleCommand):
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
ReadyToRefreshStaleUserDirectoryProfilesCommand,
)

# Map of command name to command type.
Expand All @@ -448,6 +464,7 @@ class RemoteServerUpCommand(_SimpleCommand):
ErrorCommand.NAME,
PingCommand.NAME,
RemoteServerUpCommand.NAME,
ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME,
)

# The commands the client is allowed to send
Expand All @@ -461,6 +478,7 @@ class RemoteServerUpCommand(_SimpleCommand):
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME,
)


Expand Down
7 changes: 6 additions & 1 deletion synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
"populate_user_directory_process_rooms",
),
(
"populate_user_directory_cleanup",
"populate_user_directory_process_remote_users",
"{}",
"populate_user_directory_process_users",
),
(
"populate_user_directory_cleanup",
"{}",
"populate_user_directory_process_remote_users",
),
]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
Expand Down
189 changes: 173 additions & 16 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
cast,
)

from synapse.replication.tcp.commands import (
ReadyToRefreshStaleUserDirectoryProfilesCommand,
)

try:
# Figure out if ICU support is available for searching users.
import icu
Expand Down Expand Up @@ -91,17 +95,32 @@ def __init__(
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_process_users",
self._populate_user_directory_process_users,
self._populate_user_directory_process_local_users,
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_process_remote_users",
self._populate_user_directory_process_remote_users,
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
)

@staticmethod
def _delete_staging_area(txn: LoggingTransaction) -> None:
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute(
"DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup"
)
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")

async def _populate_user_directory_createtables(
self, progress: JsonDict, batch_size: int
) -> int:
# Get all the rooms that we want to process.
def _make_staging_area(txn: LoggingTransaction) -> None:
# Clear out any tables if they already exist beforehand.
UserDirectoryBackgroundUpdateStore._delete_staging_area(txn)

sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
Expand Down Expand Up @@ -142,6 +161,18 @@ def _make_staging_area(txn: LoggingTransaction) -> None:
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
)

# A table for storing a list of remote users that *may* need a remote
# lookup in order to obtain a public profile.
# The list should be compared against the user directory's cache
# to see whether any queries can be skipped because the remote user
# also appeared in a public room.
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_remote_users_needing_lookup(user_id TEXT PRIMARY KEY NOT NULL)"
)
txn.execute(sql)

new_pos = await self.get_max_stream_id_in_current_state_deltas()
await self.db_pool.runInteraction(
"populate_user_directory_temp_build", _make_staging_area
Expand All @@ -168,13 +199,9 @@ async def _populate_user_directory_cleanup(
)
await self.update_user_directory_stream_pos(position)

def _delete_staging_area(txn: LoggingTransaction) -> None:
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")

await self.db_pool.runInteraction(
"populate_user_directory_cleanup", _delete_staging_area
"populate_user_directory_cleanup",
UserDirectoryBackgroundUpdateStore._delete_staging_area,
)

await self.db_pool.updates._end_background_update(
Expand Down Expand Up @@ -262,10 +289,17 @@ def _get_next_batch(
or await self.should_include_local_user_in_dir(user_id)
}

# Determine whether the room is public
is_public = await self.is_room_world_readable_or_publicly_joinable(
room_id
)

remote_users_to_query_later = set()

# Upsert a user_directory record for each remote user we see.
for user_id, profile in users_with_profile.items():
# Local users are processed separately in
# `_populate_user_directory_users`; there we can read from
# `_populate_user_directory_local_users`; there we can read from
# the `profiles` table to ensure we don't leak their per-room
# profiles. It also means we write local users to this table
# exactly once, rather than once for every room they're in.
Expand All @@ -274,14 +308,29 @@ def _get_next_batch(
# TODO `users_with_profile` above reads from the `user_directory`
# table, meaning that `profile` is bespoke to this room.
# and this leaks remote users' per-room profiles to the user directory.
await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
if is_public:
# If this is a public room, it's acceptable to add the profile
# into the user directory.
await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
else:
# Otherwise query the user at a later time
remote_users_to_query_later.add(user_id)

# (insert the remote users needing a query in batch;
# use upsert with no values for 'INSERT OR IGNORE' semantics)
await self.db_pool.simple_upsert_many(
f"{TEMP_TABLE}_remote_users_needing_lookup",
("user_id",),
[(u,) for u in remote_users_to_query_later],
(),
(),
desc="populate_user_directory_queue_remote_needing_lookup",
)
del remote_users_to_query_later

# Now update the room sharing tables to include this room.
is_public = await self.is_room_world_readable_or_publicly_joinable(
room_id
)
if is_public:
if users_with_profile:
await self.add_users_in_public_rooms(
Expand Down Expand Up @@ -336,7 +385,7 @@ def _get_next_batch(

return processed_event_count

async def _populate_user_directory_process_users(
async def _populate_user_directory_process_local_users(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Expand Down Expand Up @@ -404,6 +453,114 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:

return len(users_to_work_on)

async def _populate_user_directory_process_remote_users(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Sorts through the `_remote_users_needing_lookup` table and adds the
users within to the list of stale remote profiles,
unless we already populated a user directory entry for them (i.e. they were
also in a public room).
"""

def _get_next_batch_txn(
txn: LoggingTransaction, done_up_to_user_id: str
) -> Optional[str]:
"""
Given the last user ID we've processed,
Returns
- a user ID to process up to and including; or
- `None` if there is no limit left (i.e. we should just process all
remaining rows).
"""
# Should be a B-Tree index only scan: so reasonably efficient despite the
# OFFSET
# If we're lucky, will also warm up the disk cache for the subsequent query
# that actually does some work.
txn.execute(
f"""
SELECT user_id
FROM {TEMP_TABLE}_remote_users_needing_lookup
WHERE user_id > ?
ORDER BY user_id
LIMIT 1 OFFSET ?
""",
(done_up_to_user_id, batch_size),
)
row = txn.fetchone()
if row:
return row[0]
else:
return None

def _add_private_only_users_to_stale_profile_refresh_queue_txn(
txn: LoggingTransaction, from_exc: str, until_inc: Optional[str]
) -> None:
end_condition = "AND user_id <= ?" if until_inc is not None else ""
end_args = (until_inc,) if until_inc is not None else ()

user_id_serverpart: str
if isinstance(self.database_engine, PostgresEngine):
user_id_serverpart = (
"SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)"
)
elif isinstance(self.database_engine, Sqlite3Engine):
user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)"
else:
raise RuntimeError("Unknown database engine!")

txn.execute(
f"""
INSERT INTO user_directory_stale_remote_users
(user_id, next_try_at_ts, retry_counter, user_server_name)
SELECT
user_id, 0, 0, {user_id_serverpart}
FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl
LEFT JOIN user_directory AS ud USING (user_id)
WHERE ud.user_id IS NULL
AND ? < user_id {end_condition}
""",
(from_exc,) + end_args,
)

def _do_txn(txn: LoggingTransaction) -> None:
"""
Does a step of background update.
"""
last_user_id = progress.get("last_user_id", "@")
next_end_limit_inc = _get_next_batch_txn(txn, last_user_id)
_add_private_only_users_to_stale_profile_refresh_queue_txn(
txn, last_user_id, next_end_limit_inc
)

# Update the progress
progress["last_user_id"] = next_end_limit_inc
self.db_pool.updates._background_update_progress_txn(
txn, "populate_user_directory_process_remote_users", progress
)

if progress.get("last_user_id", "@") is None:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_remote_users"
)

# Now kick off querying remote homeservers for profile information.
if self.hs.config.worker.should_update_user_directory:
self.hs.get_user_directory_handler().kick_off_remote_profile_refresh_process()
else:
command_handler = self.hs.get_replication_command_handler()
command_handler.send_command(
ReadyToRefreshStaleUserDirectoryProfilesCommand("")
)

return 1

await self.db_pool.runInteraction(
"populate_user_directory_process_remote_users",
_do_txn,
)
return batch_size

async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Is this user one of them?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Rebuild the user directory in light of the fix for leaking the per-room
-- profiles of remote users to the user directory.

-- First cancel any existing rebuilds if already pending; we'll run from fresh.
DELETE FROM background_updates WHERE update_name IN (
'populate_user_directory_createtables',
'populate_user_directory_process_rooms',
'populate_user_directory_process_users',
'populate_user_directory_process_remote_users',
'populate_user_directory_cleanup'
);

-- Then schedule the steps.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
-- Set up user directory staging tables.
(7402, 'populate_user_directory_createtables', '{}', NULL),
-- Run through each room and update the user directory according to who is in it.
(7402, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'),
-- Insert all users into the user directory, if search_all_users is on.
(7402, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'),
-- Insert remote users into the queue for fetching.
(7402, 'populate_user_directory_process_remote_users', '{}', 'populate_user_directory_process_users'),
-- Clean up user directory staging tables.
(7402, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_remote_users')
ON CONFLICT (update_name) DO NOTHING;
7 changes: 4 additions & 3 deletions tests/handlers/test_user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,10 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user_dir_handler = hs.get_user_directory_handler()
self.profile_handler = hs.get_profile_handler()

# Cancel the startup call: in the steady-state case we can't rely on it anyway.
assert self.user_dir_handler._refresh_remote_profiles_call_later is not None
self.user_dir_handler._refresh_remote_profiles_call_later.cancel()
if self.user_dir_handler._refresh_remote_profiles_call_later is not None:
# Cancel the startup call: in the steady-state case we can't rely on
# it anyway.
self.user_dir_handler._refresh_remote_profiles_call_later.cancel()

def test_public_rooms_have_profiles_collected(self) -> None:
"""
Expand Down