Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 6d7523e

Browse files
authored
Batch fetch bundled references (#14508)
Avoid an n+1 query problem and fetch the bundled aggregations for m.reference relations in a single query instead of a query per event. This applies similar logic for as was previously done for edits in 8b309ad (#11660; threads in b65acea (#11752); and annotations in 1799a54 (#14491).
1 parent 1799a54 commit 6d7523e

File tree

6 files changed

+133
-79
lines changed

6 files changed

+133
-79
lines changed

changelog.d/14508.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations.

synapse/handlers/relations.py

+59-69
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,7 @@
1313
# limitations under the License.
1414
import enum
1515
import logging
16-
from typing import (
17-
TYPE_CHECKING,
18-
Collection,
19-
Dict,
20-
FrozenSet,
21-
Iterable,
22-
List,
23-
Optional,
24-
Tuple,
25-
)
16+
from typing import TYPE_CHECKING, Collection, Dict, FrozenSet, Iterable, List, Optional
2617

2718
import attr
2819

@@ -32,7 +23,7 @@
3223
from synapse.logging.opentracing import trace
3324
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
3425
from synapse.streams.config import PaginationConfig
35-
from synapse.types import JsonDict, Requester, StreamToken, UserID
26+
from synapse.types import JsonDict, Requester, UserID
3627
from synapse.visibility import filter_events_for_client
3728

3829
if TYPE_CHECKING:
@@ -181,40 +172,6 @@ async def get_relations(
181172

182173
return return_value
183174

184-
async def get_relations_for_event(
185-
self,
186-
event_id: str,
187-
event: EventBase,
188-
room_id: str,
189-
relation_type: str,
190-
ignored_users: FrozenSet[str] = frozenset(),
191-
) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]:
192-
"""Get a list of events which relate to an event, ordered by topological ordering.
193-
194-
Args:
195-
event_id: Fetch events that relate to this event ID.
196-
event: The matching EventBase to event_id.
197-
room_id: The room the event belongs to.
198-
relation_type: The type of relation.
199-
ignored_users: The users ignored by the requesting user.
200-
201-
Returns:
202-
List of event IDs that match relations requested. The rows are of
203-
the form `{"event_id": "..."}`.
204-
"""
205-
206-
# Call the underlying storage method, which is cached.
207-
related_events, next_token = await self._main_store.get_relations_for_event(
208-
event_id, event, room_id, relation_type, direction="f"
209-
)
210-
211-
# Filter out ignored users and convert to the expected format.
212-
related_events = [
213-
event for event in related_events if event.sender not in ignored_users
214-
]
215-
216-
return related_events, next_token
217-
218175
async def redact_events_related_to(
219176
self,
220177
requester: Requester,
@@ -329,6 +286,46 @@ async def get_annotations_for_events(
329286

330287
return filtered_results
331288

289+
async def get_references_for_events(
290+
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
291+
) -> Dict[str, List[_RelatedEvent]]:
292+
"""Get a list of references to the given events.
293+
294+
Args:
295+
event_ids: Fetch events that relate to this event ID.
296+
ignored_users: The users ignored by the requesting user.
297+
298+
Returns:
299+
A map of event IDs to a list related events.
300+
"""
301+
302+
related_events = await self._main_store.get_references_for_events(event_ids)
303+
304+
# Avoid additional logic if there are no ignored users.
305+
if not ignored_users:
306+
return {
307+
event_id: results
308+
for event_id, results in related_events.items()
309+
if results
310+
}
311+
312+
# Filter out ignored users.
313+
results = {}
314+
for event_id, events in related_events.items():
315+
# If no references, skip.
316+
if not events:
317+
continue
318+
319+
# Filter ignored users out.
320+
events = [event for event in events if event.sender not in ignored_users]
321+
# If there are no events left, skip this event.
322+
if not events:
323+
continue
324+
325+
results[event_id] = events
326+
327+
return results
328+
332329
async def _get_threads_for_events(
333330
self,
334331
events_by_id: Dict[str, EventBase],
@@ -412,14 +409,18 @@ async def _get_threads_for_events(
412409
if event is None:
413410
continue
414411

415-
potential_events, _ = await self.get_relations_for_event(
416-
event_id,
417-
event,
418-
room_id,
419-
RelationTypes.THREAD,
420-
ignored_users,
412+
# Attempt to find another event to use as the latest event.
413+
potential_events, _ = await self._main_store.get_relations_for_event(
414+
event_id, event, room_id, RelationTypes.THREAD, direction="f"
421415
)
422416

417+
# Filter out ignored users.
418+
potential_events = [
419+
event
420+
for event in potential_events
421+
if event.sender not in ignored_users
422+
]
423+
423424
# If all found events are from ignored users, do not include
424425
# a summary of the thread.
425426
if not potential_events:
@@ -534,27 +535,16 @@ async def get_bundled_aggregations(
534535
"chunk": annotations
535536
}
536537

537-
# Fetch other relations per event.
538-
for event in events_by_id.values():
539-
# Fetch any references to bundle with this event.
540-
references, next_token = await self.get_relations_for_event(
541-
event.event_id,
542-
event,
543-
event.room_id,
544-
RelationTypes.REFERENCE,
545-
ignored_users=ignored_users,
546-
)
538+
# Fetch any references to bundle with this event.
539+
references_by_event_id = await self.get_references_for_events(
540+
events_by_id.keys(), ignored_users=ignored_users
541+
)
542+
for event_id, references in references_by_event_id.items():
547543
if references:
548-
aggregations = results.setdefault(event.event_id, BundledAggregations())
549-
aggregations.references = {
544+
results.setdefault(event_id, BundledAggregations()).references = {
550545
"chunk": [{"event_id": ev.event_id} for ev in references]
551546
}
552547

553-
if next_token:
554-
aggregations.references["next_batch"] = await next_token.to_string(
555-
self._main_store
556-
)
557-
558548
# Fetch any edits (but not for redacted events).
559549
#
560550
# Note that there is no use in limiting edits by ignored users since the
@@ -600,7 +590,7 @@ async def get_threads(
600590
room_id, requester, allow_departed_users=True
601591
)
602592

603-
# Note that ignored users are not passed into get_relations_for_event
593+
# Note that ignored users are not passed into get_threads
604594
# below. Ignored users are handled in filter_events_for_client (and by
605595
# not passing them in here we should get a better cache hit rate).
606596
thread_roots, next_batch = await self._main_store.get_threads(

synapse/storage/databases/main/cache.py

+1
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def _invalidate_caches_for_event(
259259

260260
if relates_to:
261261
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
262+
self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
262263
self._attempt_to_invalidate_cache(
263264
"get_aggregation_groups_for_event", (relates_to,)
264265
)

synapse/storage/databases/main/events.py

+4
Original file line numberDiff line numberDiff line change
@@ -2049,6 +2049,10 @@ def _handle_redact_relations(
20492049
self.store._invalidate_cache_and_stream(
20502050
txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
20512051
)
2052+
if rel_type == RelationTypes.REFERENCE:
2053+
self.store._invalidate_cache_and_stream(
2054+
txn, self.store.get_references_for_event, (redacted_relates_to,)
2055+
)
20522056
if rel_type == RelationTypes.REPLACE:
20532057
self.store._invalidate_cache_and_stream(
20542058
txn, self.store.get_applicable_edit, (redacted_relates_to,)

synapse/storage/databases/main/relations.py

+66-8
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ class _RelatedEvent:
8282
event_id: str
8383
# The sender of the related event.
8484
sender: str
85-
topological_ordering: Optional[int]
86-
stream_ordering: int
8785

8886

8987
class RelationsWorkerStore(SQLBaseStore):
@@ -246,13 +244,17 @@ def _get_recent_references_for_event_txn(
246244
txn.execute(sql, where_args + [limit + 1])
247245

248246
events = []
249-
for event_id, relation_type, sender, topo_ordering, stream_ordering in txn:
247+
topo_orderings: List[int] = []
248+
stream_orderings: List[int] = []
249+
for event_id, relation_type, sender, topo_ordering, stream_ordering in cast(
250+
List[Tuple[str, str, str, int, int]], txn
251+
):
250252
# Do not include edits for redacted events as they leak event
251253
# content.
252254
if not is_redacted or relation_type != RelationTypes.REPLACE:
253-
events.append(
254-
_RelatedEvent(event_id, sender, topo_ordering, stream_ordering)
255-
)
255+
events.append(_RelatedEvent(event_id, sender))
256+
topo_orderings.append(topo_ordering)
257+
stream_orderings.append(stream_ordering)
256258

257259
# If there are more events, generate the next pagination key from the
258260
# last event returned.
@@ -261,9 +263,11 @@ def _get_recent_references_for_event_txn(
261263
# Instead of using the last row (which tells us there is more
262264
# data), use the last row to be returned.
263265
events = events[:limit]
266+
topo_orderings = topo_orderings[:limit]
267+
stream_orderings = stream_orderings[:limit]
264268

265-
topo = events[-1].topological_ordering
266-
token = events[-1].stream_ordering
269+
topo = topo_orderings[-1]
270+
token = stream_orderings[-1]
267271
if direction == "b":
268272
# Tokens are positions between events.
269273
# This token points *after* the last event in the chunk.
@@ -530,6 +534,60 @@ def _get_aggregation_groups_for_users_txn(
530534
"get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn
531535
)
532536

537+
@cached()
538+
async def get_references_for_event(self, event_id: str) -> List[JsonDict]:
539+
raise NotImplementedError()
540+
541+
@cachedList(cached_method_name="get_references_for_event", list_name="event_ids")
542+
async def get_references_for_events(
543+
self, event_ids: Collection[str]
544+
) -> Mapping[str, Optional[List[_RelatedEvent]]]:
545+
"""Get a list of references to the given events.
546+
547+
Args:
548+
event_ids: Fetch events that relate to these event IDs.
549+
550+
Returns:
551+
A map of event IDs to a list of related event IDs (and their senders).
552+
"""
553+
554+
clause, args = make_in_list_sql_clause(
555+
self.database_engine, "relates_to_id", event_ids
556+
)
557+
args.append(RelationTypes.REFERENCE)
558+
559+
sql = f"""
560+
SELECT relates_to_id, ref.event_id, ref.sender
561+
FROM events AS ref
562+
INNER JOIN event_relations USING (event_id)
563+
INNER JOIN events AS parent ON
564+
parent.event_id = relates_to_id
565+
AND parent.room_id = ref.room_id
566+
WHERE
567+
{clause}
568+
AND relation_type = ?
569+
ORDER BY ref.topological_ordering, ref.stream_ordering
570+
"""
571+
572+
def _get_references_for_events_txn(
573+
txn: LoggingTransaction,
574+
) -> Mapping[str, List[_RelatedEvent]]:
575+
txn.execute(sql, args)
576+
577+
result: Dict[str, List[_RelatedEvent]] = {}
578+
for relates_to_id, event_id, sender in cast(
579+
List[Tuple[str, str, str]], txn
580+
):
581+
result.setdefault(relates_to_id, []).append(
582+
_RelatedEvent(event_id, sender)
583+
)
584+
585+
return result
586+
587+
return await self.db_pool.runInteraction(
588+
"_get_references_for_events_txn", _get_references_for_events_txn
589+
)
590+
533591
@cached()
534592
def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
535593
raise NotImplementedError()

tests/rest/client/test_relations.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1108,7 +1108,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
11081108

11091109
# The "user" sent the root event and is making queries for the bundled
11101110
# aggregations: they have participated.
1111-
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 8)
1111+
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 7)
11121112
# The "user2" sent replies in the thread and is making queries for the
11131113
# bundled aggregations: they have participated.
11141114
#
@@ -1170,7 +1170,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
11701170
bundled_aggregations["latest_event"].get("unsigned"),
11711171
)
11721172

1173-
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 8)
1173+
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 7)
11741174

11751175
def test_nested_thread(self) -> None:
11761176
"""

0 commit comments

Comments
 (0)