This repository was archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Optimise get_rooms_for_user (drop with_stream_ordering) #13787
Merged
erikjohnston
merged 17 commits into
matrix-org:develop
from
beeper:optimise-get-rooms-for-user
Sep 29, 2022
Merged
Changes from 10 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
1008106
Reimplement `get_rooms_for_user` and `get_rooms_for_users`
Fizzadar c98bbfc
Replace calls to `get_rooms_for_users_with_stream_ordering`
Fizzadar 7210fa1
Drop unused `get_rooms_for_users_with_stream_ordering` method
Fizzadar 7719d9c
Drop simplified cache in sync tests
Fizzadar b745518
Linting fixes
Fizzadar 661e7cf
Fix call to `get_immediate`
Fizzadar 3df4579
Add placeholder changelog file
Fizzadar 0497004
Fix formatting
Fizzadar 7e127f8
Don't overwrite variable
Fizzadar b97ae86
Merge branch 'develop' into optimise-get-rooms-for-user
Fizzadar 7512017
Remove blank changelog line
Fizzadar 21c8e8d
Use `simple_select_many_batch` in `get_rooms_for_users`
Fizzadar 9907aaa
Use `simple_select_onecol` in `get_rooms_for_user`
Fizzadar ff3ae7e
Merge branch 'develop' into optimise-get-rooms-for-user
Fizzadar 4da7073
Fixed merge formatting
Fizzadar bd70af9
Fix `current_state_events` column name
Fizzadar 7b0e311
Always ensure we return a value for every user in `get_rooms_for_users`
Fizzadar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar). | ||
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,6 @@ | |
import logging | ||
from typing import ( | ||
TYPE_CHECKING, | ||
Callable, | ||
Collection, | ||
Dict, | ||
FrozenSet, | ||
|
@@ -52,7 +51,6 @@ | |
from synapse.util.async_helpers import Linearizer | ||
from synapse.util.caches import intern_string | ||
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList | ||
from synapse.util.cancellation import cancellable | ||
from synapse.util.iterutils import batch_iter | ||
from synapse.util.metrics import Measure | ||
|
||
|
@@ -600,58 +598,6 @@ def _get_rooms_for_user_with_stream_ordering_txn( | |
for room_id, instance, stream_id in txn | ||
) | ||
|
||
@cachedList( | ||
cached_method_name="get_rooms_for_user_with_stream_ordering", | ||
list_name="user_ids", | ||
) | ||
async def get_rooms_for_users_with_stream_ordering( | ||
self, user_ids: Collection[str] | ||
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: | ||
"""A batched version of `get_rooms_for_user_with_stream_ordering`. | ||
|
||
Returns: | ||
Map from user_id to set of rooms that is currently in. | ||
""" | ||
return await self.db_pool.runInteraction( | ||
"get_rooms_for_users_with_stream_ordering", | ||
self._get_rooms_for_users_with_stream_ordering_txn, | ||
user_ids, | ||
) | ||
|
||
def _get_rooms_for_users_with_stream_ordering_txn( | ||
self, txn: LoggingTransaction, user_ids: Collection[str] | ||
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: | ||
|
||
clause, args = make_in_list_sql_clause( | ||
self.database_engine, | ||
"c.state_key", | ||
user_ids, | ||
) | ||
|
||
sql = f""" | ||
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering | ||
FROM current_state_events AS c | ||
INNER JOIN events AS e USING (room_id, event_id) | ||
WHERE | ||
c.type = 'm.room.member' | ||
AND c.membership = ? | ||
AND {clause} | ||
""" | ||
|
||
txn.execute(sql, [Membership.JOIN] + args) | ||
|
||
result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = { | ||
user_id: set() for user_id in user_ids | ||
} | ||
for user_id, room_id, instance, stream_id in txn: | ||
result[user_id].add( | ||
GetRoomsForUserWithStreamOrdering( | ||
room_id, PersistedEventPosition(instance, stream_id) | ||
) | ||
) | ||
|
||
return {user_id: frozenset(v) for user_id, v in result.items()} | ||
|
||
async def get_users_server_still_shares_room_with( | ||
self, user_ids: Collection[str] | ||
) -> Set[str]: | ||
|
@@ -687,19 +633,86 @@ def _get_users_server_still_shares_room_with_txn( | |
_get_users_server_still_shares_room_with_txn, | ||
) | ||
|
||
@cancellable | ||
async def get_rooms_for_user( | ||
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None | ||
) -> FrozenSet[str]: | ||
@cached(max_entries=500000, iterable=True) | ||
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: | ||
"""Returns a set of room_ids the user is currently joined to. | ||
|
||
If a remote user only returns rooms this server is currently | ||
participating in. | ||
""" | ||
rooms = await self.get_rooms_for_user_with_stream_ordering( | ||
user_id, on_invalidate=on_invalidate | ||
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( | ||
(user_id,), | ||
None, | ||
update_metrics=False, | ||
) | ||
if rooms: | ||
return frozenset(r.room_id for r in rooms) | ||
|
||
return await self.db_pool.runInteraction( | ||
"get_rooms_for_user", | ||
self._get_rooms_for_user_txn, | ||
user_id, | ||
) | ||
|
||
def _get_rooms_for_user_txn( | ||
self, txn: LoggingTransaction, user_id: str | ||
) -> FrozenSet[str]: | ||
sql = """ | ||
SELECT room_id | ||
FROM current_state_events AS c | ||
WHERE | ||
c.type = 'm.room.member' | ||
AND c.state_key = ? | ||
AND c.membership = ? | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may as well just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
txn.execute(sql, (user_id, Membership.JOIN)) | ||
return frozenset(row[0] for row in txn) | ||
|
||
@cachedList( | ||
cached_method_name="get_rooms_for_user", | ||
list_name="user_ids", | ||
) | ||
async def get_rooms_for_users( | ||
self, user_ids: Collection[str] | ||
) -> Dict[str, FrozenSet[str]]: | ||
"""A batched version of `get_rooms_for_user`. | ||
|
||
Returns: | ||
Map from user_id to set of rooms that is currently in. | ||
""" | ||
return await self.db_pool.runInteraction( | ||
"get_rooms_for_users", | ||
self._get_rooms_for_users_txn, | ||
user_ids, | ||
) | ||
return frozenset(r.room_id for r in rooms) | ||
|
||
def _get_rooms_for_users_txn( | ||
self, txn: LoggingTransaction, user_ids: Collection[str] | ||
) -> Dict[str, FrozenSet[str]]: | ||
|
||
clause, args = make_in_list_sql_clause( | ||
self.database_engine, | ||
"c.state_key", | ||
user_ids, | ||
) | ||
|
||
sql = f""" | ||
SELECT c.state_key, room_id | ||
FROM current_state_events AS c | ||
WHERE | ||
c.type = 'm.room.member' | ||
AND c.membership = ? | ||
AND {clause} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto, I think we can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
""" | ||
|
||
txn.execute(sql, [Membership.JOIN] + args) | ||
|
||
result: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} | ||
for user_id, room_id in txn: | ||
result[user_id].add(room_id) | ||
|
||
return {user_id: frozenset(v) for user_id, v in result.items()} | ||
|
||
@cached(max_entries=10000) | ||
async def does_pair_of_users_share_a_room( | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.