Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 13ea6fa

Browse files
author
Mathieu Velten
committed
Non lazy loading sync not blocking during fast join
Signed-off-by: Mathieu Velten <[email protected]>
1 parent 4db3331 commit 13ea6fa

File tree

8 files changed

+106
-15
lines changed

8 files changed

+106
-15
lines changed

changelog.d/14831.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Non lazy loading sync not blocking during fast join.

synapse/handlers/sync.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,11 +1817,32 @@ async def _generate_sync_entry_for_rooms(
18171817
)
18181818
sync_result_builder.now_token = now_token
18191819

1820+
# Retrieve rooms that got un partial stated in the meantime, only useful in case
1821+
# of a non lazy-loading-members sync.
1822+
un_partial_stated_rooms = set()
1823+
if not sync_result_builder.sync_config.filter_collection.lazy_load_members():
1824+
un_partial_state_rooms_since = 0
1825+
if sync_result_builder.since_token is not None:
1826+
un_partial_state_rooms_since = int(
1827+
sync_result_builder.since_token.un_partial_state_rooms_key
1828+
)
1829+
1830+
un_partial_state_rooms_now = int(now_token.un_partial_state_rooms_key)
1831+
if un_partial_state_rooms_since != un_partial_state_rooms_now:
1832+
un_partial_stated_rooms = (
1833+
await self.store.get_un_partial_stated_rooms_between(
1834+
un_partial_state_rooms_since,
1835+
un_partial_state_rooms_now,
1836+
)
1837+
)
1838+
18201839
# 2. We check up front if anything has changed, if it hasn't then there is
18211840
# no point in going further.
18221841
if not sync_result_builder.full_state:
18231842
if since_token and not ephemeral_by_room and not account_data_by_room:
1824-
have_changed = await self._have_rooms_changed(sync_result_builder)
1843+
have_changed = await self._have_rooms_changed(
1844+
sync_result_builder, un_partial_stated_rooms
1845+
)
18251846
log_kv({"rooms_have_changed": have_changed})
18261847
if not have_changed:
18271848
tags_by_room = await self.store.get_updated_tags(
@@ -1835,7 +1856,7 @@ async def _generate_sync_entry_for_rooms(
18351856
ignored_users = await self.store.ignored_users(user_id)
18361857
if since_token:
18371858
room_changes = await self._get_rooms_changed(
1838-
sync_result_builder, ignored_users
1859+
sync_result_builder, ignored_users, un_partial_stated_rooms
18391860
)
18401861
tags_by_room = await self.store.get_updated_tags(
18411862
user_id, since_token.account_data_key
@@ -1888,7 +1909,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
18881909
)
18891910

18901911
async def _have_rooms_changed(
1891-
self, sync_result_builder: "SyncResultBuilder"
1912+
self,
1913+
sync_result_builder: "SyncResultBuilder",
1914+
un_partial_stated_rooms: Set[str],
18921915
) -> bool:
18931916
"""Returns whether there may be any new events that should be sent down
18941917
the sync. Returns True if there are.
@@ -1905,6 +1928,11 @@ async def _have_rooms_changed(
19051928

19061929
stream_id = since_token.room_key.stream
19071930
for room_id in sync_result_builder.joined_room_ids:
1931+
# If a room has been un partial stated in the meantime,
1932+
# let's consider it has changes and deal with it accordingly
1933+
# in _get_rooms_changed.
1934+
if room_id in un_partial_stated_rooms:
1935+
return True
19081936
if self.store.has_room_changed_since(room_id, stream_id):
19091937
return True
19101938
return False
@@ -1913,6 +1941,7 @@ async def _get_rooms_changed(
19131941
self,
19141942
sync_result_builder: "SyncResultBuilder",
19151943
ignored_users: FrozenSet[str],
1944+
un_partial_stated_rooms: Set[str],
19161945
) -> _RoomChanges:
19171946
"""Determine the changes in rooms to report to the user.
19181947
@@ -2116,7 +2145,24 @@ async def _get_rooms_changed(
21162145
room_entry = room_to_events.get(room_id, None)
21172146

21182147
newly_joined = room_id in newly_joined_rooms
2119-
if room_entry:
2148+
2149+
# In case of a non lazy-loading-members sync we want to include
2150+
# rooms that got un partial stated in the meantime, and we need
2151+
# to include the full state of them.
2152+
if (
2153+
not sync_config.filter_collection.lazy_load_members()
2154+
and room_id in un_partial_stated_rooms
2155+
):
2156+
entry = RoomSyncResultBuilder(
2157+
room_id=room_id,
2158+
rtype="joined",
2159+
events=None,
2160+
newly_joined=True,
2161+
full_state=True,
2162+
since_token=None,
2163+
upto_token=now_token,
2164+
)
2165+
elif room_entry:
21202166
events, start_key = room_entry
21212167

21222168
prev_batch_token = now_token.copy_and_replace(
@@ -2186,6 +2232,13 @@ async def _get_all_rooms(
21862232
knocked = []
21872233

21882234
for event in room_list:
2235+
# Do not include rooms that we don't have the full state yet
2236+
# in case of non lazy-loading-members sync.
2237+
if (
2238+
not sync_config.filter_collection.lazy_load_members()
2239+
) and await self.store.is_partial_state_room(event.room_id):
2240+
continue
2241+
21892242
if event.room_version_id not in KNOWN_ROOM_VERSIONS:
21902243
continue
21912244

synapse/storage/databases/main/relations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
292292
to_device_key=0,
293293
device_list_key=0,
294294
groups_key=0,
295+
un_partial_state_rooms_key=0,
295296
)
296297

297298
return events[:limit], next_token

synapse/storage/databases/main/room.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
Mapping,
2727
Optional,
2828
Sequence,
29+
Set,
2930
Tuple,
3031
Union,
3132
cast,
@@ -1285,10 +1286,39 @@ def get_un_partial_stated_rooms_token(self) -> int:
12851286
# explanation.)
12861287
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
12871288

1289+
async def get_un_partial_stated_rooms_between(
1290+
self, last_id: int, current_id: int
1291+
) -> Set[str]:
1292+
"""Get all rooms that got un partial stated between `last_id` exclusive and
1293+
`current_id` inclusive.
1294+
1295+
Returns:
1296+
The list of rooms.
1297+
"""
1298+
1299+
if last_id == current_id:
1300+
return set()
1301+
1302+
def _get_un_partial_stated_rooms_between_txn(
1303+
txn: LoggingTransaction,
1304+
) -> Set[str]:
1305+
sql = """
1306+
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
1307+
WHERE ? < stream_id AND stream_id <= ?
1308+
"""
1309+
txn.execute(sql, (last_id, current_id))
1310+
1311+
return {r[0] for r in txn}
1312+
1313+
return await self.db_pool.runInteraction(
1314+
"get_un_partial_stated_rooms_between",
1315+
_get_un_partial_stated_rooms_between_txn,
1316+
)
1317+
12881318
async def get_un_partial_stated_rooms_from_stream(
12891319
self, instance_name: str, last_id: int, current_id: int, limit: int
12901320
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
1291-
"""Get updates for caches replication stream.
1321+
"""Get updates for un partial stated rooms replication stream.
12921322
12931323
Args:
12941324
instance_name: The writer we want to fetch updates from. Unused

synapse/streams/events.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken:
5858
push_rules_key = self.store.get_max_push_rules_stream_id()
5959
to_device_key = self.store.get_to_device_stream_token()
6060
device_list_key = self.store.get_device_stream_token()
61+
un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token()
6162

6263
token = StreamToken(
6364
room_key=self.sources.room.get_current_key(),
@@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken:
7071
device_list_key=device_list_key,
7172
# Groups key is unused.
7273
groups_key=0,
74+
un_partial_state_rooms_key=un_partial_state_rooms_key,
7375
)
7476
return token
7577

@@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
107109
to_device_key=0,
108110
device_list_key=0,
109111
groups_key=0,
112+
un_partial_state_rooms_key=0,
110113
)
111114
return token

synapse/types/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,12 +646,13 @@ class StreamToken:
646646
7. `to_device_key`: `274711`
647647
8. `device_list_key`: `265584`
648648
9. `groups_key`: `1` (note that this key is now unused)
649+
10. `un_partial_state_rooms_key`: `379`
649650
650651
You can see how many of these keys correspond to the various
651652
fields in a "/sync" response:
652653
```json
653654
{
654-
"next_batch": "s12_4_0_1_1_1_1_4_1",
655+
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
655656
"presence": {
656657
"events": []
657658
},
@@ -663,7 +664,7 @@ class StreamToken:
663664
"!QrZlfIDQLNLdZHqTnt:hs1": {
664665
"timeline": {
665666
"events": [],
666-
"prev_batch": "s10_4_0_1_1_1_1_4_1",
667+
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
667668
"limited": false
668669
},
669670
"state": {
@@ -699,6 +700,7 @@ class StreamToken:
699700
device_list_key: int
700701
# Note that the groups key is no longer used and may have bogus values.
701702
groups_key: int
703+
un_partial_state_rooms_key: int
702704

703705
_SEPARATOR = "_"
704706
START: ClassVar["StreamToken"]
@@ -737,6 +739,7 @@ async def to_string(self, store: "DataStore") -> str:
737739
# serialized so that there will not be confusion in the future
738740
# if additional tokens are added.
739741
str(self.groups_key),
742+
str(self.un_partial_state_rooms_key),
740743
]
741744
)
742745

@@ -769,7 +772,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
769772
return attr.evolve(self, **{key: new_value})
770773

771774

772-
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
775+
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
773776

774777

775778
@attr.s(slots=True, frozen=True, auto_attribs=True)

tests/rest/admin/test_room.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None:
18311831

18321832
def test_topo_token_is_accepted(self) -> None:
18331833
"""Test Topo Token is accepted."""
1834-
token = "t1-0_0_0_0_0_0_0_0_0"
1834+
token = "t1-0_0_0_0_0_0_0_0_0_0"
18351835
channel = self.make_request(
18361836
"GET",
18371837
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None:
18451845

18461846
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
18471847
"""Test that stream token is accepted for forward pagination."""
1848-
token = "s0_0_0_0_0_0_0_0_0"
1848+
token = "s0_0_0_0_0_0_0_0_0_0"
18491849
channel = self.make_request(
18501850
"GET",
18511851
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),

tests/rest/client/test_rooms.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
19871987
self.room_id = self.helper.create_room_as(self.user_id)
19881988

19891989
def test_topo_token_is_accepted(self) -> None:
1990-
token = "t1-0_0_0_0_0_0_0_0_0"
1990+
token = "t1-0_0_0_0_0_0_0_0_0_0"
19911991
channel = self.make_request(
19921992
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
19931993
)
@@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None:
19981998
self.assertTrue("end" in channel.json_body)
19991999

20002000
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
2001-
token = "s0_0_0_0_0_0_0_0_0"
2001+
token = "s0_0_0_0_0_0_0_0_0_0"
20022002
channel = self.make_request(
20032003
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
20042004
)
@@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None:
27282728
"""Test that we can filter by a label on a /messages request."""
27292729
self._send_labelled_messages_in_room()
27302730

2731-
token = "s0_0_0_0_0_0_0_0_0"
2731+
token = "s0_0_0_0_0_0_0_0_0_0"
27322732
channel = self.make_request(
27332733
"GET",
27342734
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None:
27452745
"""Test that we can filter by the absence of a label on a /messages request."""
27462746
self._send_labelled_messages_in_room()
27472747

2748-
token = "s0_0_0_0_0_0_0_0_0"
2748+
token = "s0_0_0_0_0_0_0_0_0_0"
27492749
channel = self.make_request(
27502750
"GET",
27512751
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None:
27682768
"""
27692769
self._send_labelled_messages_in_room()
27702770

2771-
token = "s0_0_0_0_0_0_0_0_0"
2771+
token = "s0_0_0_0_0_0_0_0_0_0"
27722772
channel = self.make_request(
27732773
"GET",
27742774
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"

0 commit comments

Comments
 (0)