|
20 | 20 | from synapse.api.constants import EventTypes, RelationTypes
|
21 | 21 | from synapse.api.errors import SynapseError
|
22 | 22 | from synapse.events import EventBase, relation_from_event
|
| 23 | +from synapse.logging.context import make_deferred_yieldable, run_in_background |
23 | 24 | from synapse.logging.opentracing import trace
|
24 | 25 | from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
|
25 | 26 | from synapse.streams.config import PaginationConfig
|
26 | 27 | from synapse.types import JsonDict, Requester, UserID
|
| 28 | +from synapse.util.async_helpers import gather_results |
27 | 29 | from synapse.visibility import filter_events_for_client
|
28 | 30 |
|
29 | 31 | if TYPE_CHECKING:
|
@@ -525,39 +527,56 @@ async def get_bundled_aggregations(
|
525 | 527 | # (as that is what makes it part of the thread).
|
526 | 528 | relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD
|
527 | 529 |
|
528 |
| - # Fetch any annotations (ie, reactions) to bundle with this event. |
529 |
| - annotations_by_event_id = await self.get_annotations_for_events( |
530 |
| - events_by_id.keys(), ignored_users=ignored_users |
531 |
| - ) |
532 |
| - for event_id, annotations in annotations_by_event_id.items(): |
533 |
| - if annotations: |
534 |
| - results.setdefault(event_id, BundledAggregations()).annotations = { |
535 |
| - "chunk": annotations |
536 |
| - } |
537 |
| - |
538 |
| - # Fetch any references to bundle with this event. |
539 |
| - references_by_event_id = await self.get_references_for_events( |
540 |
| - events_by_id.keys(), ignored_users=ignored_users |
541 |
| - ) |
542 |
| - for event_id, references in references_by_event_id.items(): |
543 |
| - if references: |
544 |
| - results.setdefault(event_id, BundledAggregations()).references = { |
545 |
| - "chunk": [{"event_id": ev.event_id} for ev in references] |
546 |
| - } |
547 |
| - |
548 |
| - # Fetch any edits (but not for redacted events). |
549 |
| - # |
550 |
| - # Note that there is no use in limiting edits by ignored users since the |
551 |
| - # parent event should be ignored in the first place if the user is ignored. |
552 |
| - edits = await self._main_store.get_applicable_edits( |
553 |
| - [ |
554 |
| - event_id |
555 |
| - for event_id, event in events_by_id.items() |
556 |
| - if not event.internal_metadata.is_redacted() |
557 |
| - ] |
| 530 | + async def _fetch_annotations() -> None: |
| 531 | + """Fetch any annotations (ie, reactions) to bundle with this event.""" |
| 532 | + annotations_by_event_id = await self.get_annotations_for_events( |
| 533 | + events_by_id.keys(), ignored_users=ignored_users |
| 534 | + ) |
| 535 | + for event_id, annotations in annotations_by_event_id.items(): |
| 536 | + if annotations: |
| 537 | + results.setdefault(event_id, BundledAggregations()).annotations = { |
| 538 | + "chunk": annotations |
| 539 | + } |
| 540 | + |
| 541 | + async def _fetch_references() -> None: |
| 542 | + """Fetch any references to bundle with this event.""" |
| 543 | + references_by_event_id = await self.get_references_for_events( |
| 544 | + events_by_id.keys(), ignored_users=ignored_users |
| 545 | + ) |
| 546 | + for event_id, references in references_by_event_id.items(): |
| 547 | + if references: |
| 548 | + results.setdefault(event_id, BundledAggregations()).references = { |
| 549 | + "chunk": [{"event_id": ev.event_id} for ev in references] |
| 550 | + } |
| 551 | + |
| 552 | + async def _fetch_edits() -> None: |
| 553 | + """ |
| 554 | + Fetch any edits (but not for redacted events). |
| 555 | +
|
| 556 | + Note that there is no use in limiting edits by ignored users since the |
| 557 | + parent event should be ignored in the first place if the user is ignored. |
| 558 | + """ |
| 559 | + edits = await self._main_store.get_applicable_edits( |
| 560 | + [ |
| 561 | + event_id |
| 562 | + for event_id, event in events_by_id.items() |
| 563 | + if not event.internal_metadata.is_redacted() |
| 564 | + ] |
| 565 | + ) |
| 566 | + for event_id, edit in edits.items(): |
| 567 | + results.setdefault(event_id, BundledAggregations()).replace = edit |
| 568 | + |
| 569 | + # Parallelize the calls for annotations, references, and edits since they |
| 570 | + # are unrelated. |
| 571 | + await make_deferred_yieldable( |
| 572 | + gather_results( |
| 573 | + ( |
| 574 | + run_in_background(_fetch_annotations), |
| 575 | + run_in_background(_fetch_references), |
| 576 | + run_in_background(_fetch_edits), |
| 577 | + ) |
| 578 | + ) |
558 | 579 | )
|
559 |
| - for event_id, edit in edits.items(): |
560 |
| - results.setdefault(event_id, BundledAggregations()).replace = edit |
561 | 580 |
|
562 | 581 | return results
|
563 | 582 |
|
|
0 commit comments