From f9caec5d9a9396f5388fcade48f494557731cb82 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Jul 2022 13:03:00 +0200 Subject: [PATCH 1/9] Fix rooms not being properly excluded from incremental sync --- synapse/handlers/sync.py | 25 +++++++++++++------- synapse/storage/databases/main/events.py | 8 ++++++- synapse/storage/databases/main/roommember.py | 19 +++++++++++---- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d42a414c9040..480b56a507c8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1536,14 +1536,16 @@ async def _generate_sync_entry_for_rooms( ignored_users = await self.store.ignored_users(user_id) if since_token: room_changes = await self._get_rooms_changed( - sync_result_builder, ignored_users, self.rooms_to_exclude + sync_result_builder, + ignored_users, ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: room_changes = await self._get_all_rooms( - sync_result_builder, ignored_users, self.rooms_to_exclude + sync_result_builder, + ignored_users, ) tags_by_room = await self.store.get_tags_for_user(user_id) @@ -1623,13 +1625,14 @@ async def _get_rooms_changed( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], - excluded_rooms: List[str], ) -> _RoomChanges: """Determine the changes in rooms to report to the user. This function is a first pass at generating the rooms part of the sync response. It determines which rooms have changed during the sync period, and categorises - them into four buckets: "knock", "invite", "join" and "leave". + them into four buckets: "knock", "invite", "join" and "leave". It also excludes + from that list any room that appears in the list of rooms to exclude from sync + results in the server configuration. 1. Finds all membership changes for the user in the sync period (from `since_token` up to `now_token`). @@ -1655,7 +1658,7 @@ async def _get_rooms_changed( # _have_rooms_changed. We could keep the results in memory to avoid a # second query, at the cost of more complicated source code. membership_change_events = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key, excluded_rooms + user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude ) mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} @@ -1862,7 +1865,6 @@ async def _get_all_rooms( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], - ignored_rooms: List[str], ) -> _RoomChanges: """Returns entries for all rooms for the user. @@ -1884,7 +1886,7 @@ async def _get_all_rooms( room_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, membership_list=Membership.LIST, - excluded_rooms=ignored_rooms, + excluded_rooms=self.rooms_to_exclude, ) room_entries = [] @@ -2150,7 +2152,9 @@ async def _generate_room_entry( raise Exception("Unrecognized rtype: %r", room_builder.rtype) async def get_rooms_for_user_at( - self, user_id: str, room_key: RoomStreamToken + self, + user_id: str, + room_key: RoomStreamToken, ) -> FrozenSet[str]: """Get set of joined rooms for a user at the given stream ordering. @@ -2165,7 +2169,10 @@ async def get_rooms_for_user_at( ReturnValue: Set of room_ids the user is in at given stream_ordering. """ - joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) + joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering( + user_id, + exclude_for_sync=True, + ) joined_room_ids = set() diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f600f119029..9bd226a6924e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1172,9 +1172,15 @@ def _update_current_state_txn( # Invalidate the various caches for member in members_changed: + # We need to invalidate the cache for get_rooms_for_user_with_stream_ordering + # both with and without excluding rooms for sync results. txn.call_after( self.store.get_rooms_for_user_with_stream_ordering.invalidate, - (member,), + (member, False), + ) + txn.call_after( + self.store.get_rooms_for_user_with_stream_ordering.invalidate, + (member, True), ) self.store._invalidate_state_caches_and_stream( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e2cccc688c9f..d0b4a595a6c0 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -121,6 +121,8 @@ def __init__( lambda: self._known_servers_count, ) + self._rooms_excluded_from_sync = hs.config.server.rooms_to_exclude_from_sync + @wrap_as_background_process("_count_known_servers") async def _count_known_servers(self) -> int: """ @@ -565,7 +567,9 @@ async def get_local_current_membership_for_user_in_room( @cached(max_entries=500000, iterable=True) async def get_rooms_for_user_with_stream_ordering( - self, user_id: str + self, + user_id: str, + exclude_for_sync: bool = False, ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: """Returns a set of room_ids the user is currently joined to. @@ -574,6 +578,8 @@ async def get_rooms_for_user_with_stream_ordering( Args: user_id + exclude_for_sync: Whether to exclude rooms that the configuration says + should be excluded from sync results. Returns: Returns the rooms the user is in currently, along with the stream @@ -584,10 +590,11 @@ async def get_rooms_for_user_with_stream_ordering( "get_rooms_for_user_with_stream_ordering", self._get_rooms_for_user_with_stream_ordering_txn, user_id, + exclude_for_sync, ) def _get_rooms_for_user_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_id: str + self, txn: LoggingTransaction, user_id: str, exclude_for_sync: bool ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: # We use `current_state_events` here and not `local_current_membership` # as a) this gets called with remote users and b) this only gets called @@ -620,6 +627,10 @@ def _get_rooms_for_user_with_stream_ordering_txn( room_id, PersistedEventPosition(instance, stream_id) ) for room_id, instance, stream_id in txn + if ( + exclude_for_sync is False + or room_id not in self._rooms_excluded_from_sync + ) ) @cachedList( @@ -729,9 +740,7 @@ async def get_rooms_for_user( If a remote user only returns rooms this server is currently participating in. """ - rooms = await self.get_rooms_for_user_with_stream_ordering( - user_id, on_invalidate=on_invalidate - ) + rooms = await self.get_rooms_for_user_with_stream_ordering(user_id) return frozenset(r.room_id for r in rooms) @cached(max_entries=10000) From 8758612689717c66a5026d8349bd58929bb33a4d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Jul 2022 13:09:11 +0200 Subject: [PATCH 2/9] Add test --- tests/rest/client/test_sync.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index b085c50356aa..ae161848281c 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -948,3 +948,24 @@ def test_invite(self) -> None: self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["invite"]) self.assertIn(self.included_room_id, channel.json_body["rooms"]["invite"]) + + def test_incremental_sync(self) -> None: + """Tests that activity in the room is properly filtered out of incremental + syncs. + """ + channel = self.make_request("GET", "/sync", access_token=self.tok) + self.assertEqual(channel.code, 200, channel.result) + next_batch = channel.json_body["next_batch"] + + self.helper.send(self.excluded_room_id, tok=self.tok) + self.helper.send(self.included_room_id, tok=self.tok) + + channel = self.make_request( + "GET", + f"/sync?since={next_batch}", + access_token=self.tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"]) + self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"]) From 50662685a4539316218c5829a62292b5203f1085 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Jul 2022 13:19:43 +0200 Subject: [PATCH 3/9] Changelog --- changelog.d/13408.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13408.bugfix diff --git a/changelog.d/13408.bugfix b/changelog.d/13408.bugfix new file mode 100644 index 000000000000..8b87b2cf7be5 --- /dev/null +++ b/changelog.d/13408.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.57.0 where rooms listed in `exclude_rooms_from_sync` in the configuration file would not be properly excluded from incremental syncs. From 84dd001e094e3149202e21246fa042ff2f177427 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Jul 2022 13:26:11 +0200 Subject: [PATCH 4/9] Formatting woes --- synapse/handlers/sync.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 480b56a507c8..009f01d39c44 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1536,17 +1536,13 @@ async def _generate_sync_entry_for_rooms( ignored_users = await self.store.ignored_users(user_id) if since_token: room_changes = await self._get_rooms_changed( - sync_result_builder, - ignored_users, + sync_result_builder, ignored_users ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: - room_changes = await self._get_all_rooms( - sync_result_builder, - ignored_users, - ) + room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) tags_by_room = await self.store.get_tags_for_user(user_id) log_kv({"rooms_changed": len(room_changes.room_entries)}) From c6cc614caf0505360f8d0da10b6658fb588ba819 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 29 Jul 2022 10:48:24 +0200 Subject: [PATCH 5/9] Re-add invalidate pass-through, not sure how that went away --- synapse/storage/databases/main/roommember.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index d0b4a595a6c0..912b4f61e46d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -740,7 +740,9 @@ async def get_rooms_for_user( If a remote user only returns rooms this server is currently participating in. """ - rooms = await self.get_rooms_for_user_with_stream_ordering(user_id) + rooms = await self.get_rooms_for_user_with_stream_ordering( + user_id, on_invalidate=on_invalidate + ) return frozenset(r.room_id for r in rooms) @cached(max_entries=10000) From 77633ab7e974ad4d76d289031b8dfd0e8912129c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 29 Jul 2022 10:50:15 +0200 Subject: [PATCH 6/9] Correctly invalidate cache on replication --- synapse/storage/databases/main/cache.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 12e9a423826a..3004150c17dd 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -202,8 +202,13 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) if data.type == EventTypes.Member: + # We need to invalidate the cache for get_rooms_for_user_with_stream_ordering + # both with and without excluding rooms for sync results. self.get_rooms_for_user_with_stream_ordering.invalidate( - (data.state_key,) + (data.state_key, False) + ) + self.get_rooms_for_user_with_stream_ordering.invalidate( + (data.state_key, True) ) else: raise Exception("Unknown events stream row type %s" % (row.type,)) From 8243b886226625b46b1493c6a0f5a042bf7b4a12 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 1 Aug 2022 17:31:25 +0200 Subject: [PATCH 7/9] Implement Erik's suggestion --- synapse/handlers/sync.py | 24 ++++++++++++++++---- synapse/storage/databases/main/cache.py | 7 +----- synapse/storage/databases/main/events.py | 8 +------ synapse/storage/databases/main/roommember.py | 12 +++------- 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 009f01d39c44..ec9e10e1fed3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,7 +28,7 @@ from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import NotifCounts -from synapse.storage.roommember import MemberSummary +from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( DeviceListUpdates, @@ -42,6 +42,7 @@ UserID, ) from synapse.util.async_helpers import concurrently_execute +from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext @@ -2165,10 +2166,7 @@ async def get_rooms_for_user_at( ReturnValue: Set of room_ids the user is in at given stream_ordering. """ - joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering( - user_id, - exclude_for_sync=True, - ) + joined_rooms = await self._get_filtered_joined_rooms(user_id) joined_room_ids = set() @@ -2199,6 +2197,22 @@ async def get_rooms_for_user_at( return frozenset(joined_room_ids) + @cached(cache_context=True, iterable=True) + async def _get_filtered_joined_rooms( + self, user_id: str, cache_context: _CacheContext + ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: + joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering( + user_id, + on_invalidate=cache_context.invalidate, + ) + + if len(self.rooms_to_exclude) == 0: + return joined_rooms + + return frozenset( + r for r in joined_rooms if r.room_id not in self.rooms_to_exclude + ) + def _action_has_highlight(actions: List[JsonDict]) -> bool: for action in actions: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 3004150c17dd..12e9a423826a 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -202,13 +202,8 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) if data.type == EventTypes.Member: - # We need to invalidate the cache for get_rooms_for_user_with_stream_ordering - # both with and without excluding rooms for sync results. self.get_rooms_for_user_with_stream_ordering.invalidate( - (data.state_key, False) - ) - self.get_rooms_for_user_with_stream_ordering.invalidate( - (data.state_key, True) + (data.state_key,) ) else: raise Exception("Unknown events stream row type %s" % (row.type,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 9bd226a6924e..1f600f119029 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1172,15 +1172,9 @@ def _update_current_state_txn( # Invalidate the various caches for member in members_changed: - # We need to invalidate the cache for get_rooms_for_user_with_stream_ordering - # both with and without excluding rooms for sync results. txn.call_after( self.store.get_rooms_for_user_with_stream_ordering.invalidate, - (member, False), - ) - txn.call_after( - self.store.get_rooms_for_user_with_stream_ordering.invalidate, - (member, True), + (member,), ) self.store._invalidate_state_caches_and_stream( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 912b4f61e46d..a08d226feb55 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -121,8 +121,6 @@ def __init__( lambda: self._known_servers_count, ) - self._rooms_excluded_from_sync = hs.config.server.rooms_to_exclude_from_sync - @wrap_as_background_process("_count_known_servers") async def _count_known_servers(self) -> int: """ @@ -569,7 +567,6 @@ async def get_local_current_membership_for_user_in_room( async def get_rooms_for_user_with_stream_ordering( self, user_id: str, - exclude_for_sync: bool = False, ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: """Returns a set of room_ids the user is currently joined to. @@ -590,11 +587,12 @@ async def get_rooms_for_user_with_stream_ordering( "get_rooms_for_user_with_stream_ordering", self._get_rooms_for_user_with_stream_ordering_txn, user_id, - exclude_for_sync, ) def _get_rooms_for_user_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_id: str, exclude_for_sync: bool + self, + txn: LoggingTransaction, + user_id: str, ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: # We use `current_state_events` here and not `local_current_membership` # as a) this gets called with remote users and b) this only gets called @@ -627,10 +625,6 @@ def _get_rooms_for_user_with_stream_ordering_txn( room_id, PersistedEventPosition(instance, stream_id) ) for room_id, instance, stream_id in txn - if ( - exclude_for_sync is False - or room_id not in self._rooms_excluded_from_sync - ) ) @cachedList( From f05b984b52993ff96964b1856215f90890c1c775 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 1 Aug 2022 17:33:12 +0200 Subject: [PATCH 8/9] Cleanup --- synapse/storage/databases/main/roommember.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a08d226feb55..e2cccc688c9f 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -565,8 +565,7 @@ async def get_local_current_membership_for_user_in_room( @cached(max_entries=500000, iterable=True) async def get_rooms_for_user_with_stream_ordering( - self, - user_id: str, + self, user_id: str ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: """Returns a set of room_ids the user is currently joined to. @@ -575,8 +574,6 @@ async def get_rooms_for_user_with_stream_ordering( Args: user_id - exclude_for_sync: Whether to exclude rooms that the configuration says - should be excluded from sync results. Returns: Returns the rooms the user is in currently, along with the stream @@ -590,9 +587,7 @@ async def get_rooms_for_user_with_stream_ordering( ) def _get_rooms_for_user_with_stream_ordering_txn( - self, - txn: LoggingTransaction, - user_id: str, + self, txn: LoggingTransaction, user_id: str ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: # We use `current_state_events` here and not `local_current_membership` # as a) this gets called with remote users and b) this only gets called From e78f7bd14107d24d384a02899e148771587ddb7d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 2 Aug 2022 12:24:45 +0200 Subject: [PATCH 9/9] Incorporate suggestion --- synapse/handlers/sync.py | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec9e10e1fed3..d827c03ad187 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,7 +28,7 @@ from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import NotifCounts -from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, MemberSummary +from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( DeviceListUpdates, @@ -42,7 +42,6 @@ UserID, ) from synapse.util.async_helpers import concurrently_execute -from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext @@ -2166,7 +2165,7 @@ async def get_rooms_for_user_at( ReturnValue: Set of room_ids the user is in at given stream_ordering. """ - joined_rooms = await self._get_filtered_joined_rooms(user_id) + joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) joined_room_ids = set() @@ -2177,7 +2176,12 @@ async def get_rooms_for_user_at( # If the membership's stream ordering is after the given stream # ordering, we need to go and work out if the user was in the room # before. + # We also need to check whether the room should be excluded from sync + # responses as per the homeserver config. for joined_room in joined_rooms: + if joined_room.room_id in self.rooms_to_exclude: + continue + if not joined_room.event_pos.persisted_after(room_key): joined_room_ids.add(joined_room.room_id) continue @@ -2197,22 +2201,6 @@ async def get_rooms_for_user_at( return frozenset(joined_room_ids) - @cached(cache_context=True, iterable=True) - async def _get_filtered_joined_rooms( - self, user_id: str, cache_context: _CacheContext - ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: - joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering( - user_id, - on_invalidate=cache_context.invalidate, - ) - - if len(self.rooms_to_exclude) == 0: - return joined_rooms - - return frozenset( - r for r in joined_rooms if r.room_id not in self.rooms_to_exclude - ) - def _action_has_highlight(actions: List[JsonDict]) -> bool: for action in actions: