Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 2fa68f4

Browse files
clokepH-Shay
authored andcommitted
Include thread information when sending receipts over federation. (#14466)
Include the thread_id field when sending read receipts over federation. This might result in the same user having multiple read receipts per-room, meaning multiple EDUs must be sent to encapsulate those receipts. This restructures the PerDestinationQueue APIs to support multiple receipt EDUs, queue_read_receipt now becomes linear time in the number of queued threaded receipts in the room for the given user, it is expected this is a small number since receipt EDUs are sent as filler in transactions.
1 parent 70fe2ba commit 2fa68f4

File tree

4 files changed

+198
-64
lines changed

4 files changed

+198
-64
lines changed

changelog.d/14466.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug introduced in Synapse 1.70.0 where a receipt's thread ID was not sent over federation.

synapse/federation/sender/per_destination_queue.py

Lines changed: 120 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from synapse.logging.opentracing import SynapseTags, set_tag
3636
from synapse.metrics import sent_transactions_counter
3737
from synapse.metrics.background_process_metrics import run_as_background_process
38-
from synapse.types import ReadReceipt
38+
from synapse.types import JsonDict, ReadReceipt
3939
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
4040
from synapse.visibility import filter_events_for_server
4141

@@ -136,8 +136,11 @@ def __init__(
136136
# destination
137137
self._pending_presence: Dict[str, UserPresenceState] = {}
138138

139-
# room_id -> receipt_type -> user_id -> receipt_dict
140-
self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
139+
# List of room_id -> receipt_type -> user_id -> receipt_dict,
140+
#
141+
# Each receipt can only have a single receipt per
142+
# (room ID, receipt type, user ID, thread ID) tuple.
143+
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
141144
self._rrs_pending_flush = False
142145

143146
# stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
202205
Args:
203206
receipt: receipt to be queued
204207
"""
205-
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
206-
receipt.receipt_type, {}
207-
)[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
208+
serialized_receipt: JsonDict = {
209+
"event_ids": receipt.event_ids,
210+
"data": receipt.data,
211+
}
212+
if receipt.thread_id is not None:
213+
serialized_receipt["data"]["thread_id"] = receipt.thread_id
214+
215+
# Find which EDU to add this receipt to. There's three situations depending
216+
# on the (room ID, receipt type, user, thread ID) tuple:
217+
#
218+
# 1. If it fully matches, clobber the information.
219+
# 2. If it is missing, add the information.
220+
# 3. If the subset tuple of (room ID, receipt type, user) matches, check
221+
# the next EDU (or add a new EDU).
222+
for edu in self._pending_receipt_edus:
223+
receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
224+
receipt.receipt_type, {}
225+
)
226+
# If this room ID, receipt type, user ID is not in this EDU, OR if
227+
# the full tuple matches, use the current EDU.
228+
if (
229+
receipt.user_id not in receipt_content
230+
or receipt_content[receipt.user_id].get("thread_id")
231+
== receipt.thread_id
232+
):
233+
receipt_content[receipt.user_id] = serialized_receipt
234+
break
235+
236+
# If no matching EDU was found, create a new one.
237+
else:
238+
self._pending_receipt_edus.append(
239+
{
240+
receipt.room_id: {
241+
receipt.receipt_type: {receipt.user_id: serialized_receipt}
242+
}
243+
}
244+
)
208245

209246
def flush_read_receipts_for_room(self, room_id: str) -> None:
210-
# if we don't have any read-receipts for this room, it may be that we've already
211-
# sent them out, so we don't need to flush.
212-
if room_id not in self._pending_rrs:
213-
return
214-
self._rrs_pending_flush = True
215-
self.attempt_new_transaction()
247+
# If there are any pending receipts for this room then force-flush them
248+
# in a new transaction.
249+
for edu in self._pending_receipt_edus:
250+
if room_id in edu:
251+
self._rrs_pending_flush = True
252+
self.attempt_new_transaction()
253+
# No use in checking remaining EDUs if the room was found.
254+
break
216255

217256
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
218257
self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -351,7 +390,7 @@ async def _transaction_transmission_loop(self) -> None:
351390
self._pending_edus = []
352391
self._pending_edus_keyed = {}
353392
self._pending_presence = {}
354-
self._pending_rrs = {}
393+
self._pending_receipt_edus = []
355394

356395
self._start_catching_up()
357396
except FederationDeniedError as e:
@@ -543,22 +582,27 @@ async def _catch_up_transmission_loop(self) -> None:
543582
self._destination, last_successful_stream_ordering
544583
)
545584

546-
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
547-
if not self._pending_rrs:
585+
def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
586+
if not self._pending_receipt_edus:
548587
return
549588
if not force_flush and not self._rrs_pending_flush:
550589
# not yet time for this lot
551590
return
552591

553-
edu = Edu(
554-
origin=self._server_name,
555-
destination=self._destination,
556-
edu_type=EduTypes.RECEIPT,
557-
content=self._pending_rrs,
558-
)
559-
self._pending_rrs = {}
560-
self._rrs_pending_flush = False
561-
yield edu
592+
# Send at most limit EDUs for receipts.
593+
for content in self._pending_receipt_edus[:limit]:
594+
yield Edu(
595+
origin=self._server_name,
596+
destination=self._destination,
597+
edu_type=EduTypes.RECEIPT,
598+
content=content,
599+
)
600+
self._pending_receipt_edus = self._pending_receipt_edus[limit:]
601+
602+
# If there are still pending read-receipts, don't reset the pending flush
603+
# flag.
604+
if not self._pending_receipt_edus:
605+
self._rrs_pending_flush = False
562606

563607
def _pop_pending_edus(self, limit: int) -> List[Edu]:
564608
pending_edus = self._pending_edus
@@ -645,68 +689,79 @@ class _TransactionQueueManager:
645689
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
646690
# First we calculate the EDUs we want to send, if any.
647691

648-
# We start by fetching device related EDUs, i.e device updates and to
649-
# device messages. We have to keep 2 free slots for presence and rr_edus.
650-
device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
692+
# There's a maximum number of EDUs that can be sent with a transaction,
693+
# generally device updates and to-device messages get priority, but we
694+
# want to ensure that there's room for some other EDUs as well.
695+
#
696+
# This is done by:
697+
#
698+
# * Add a presence EDU, if one exists.
699+
# * Add up-to a small limit of read receipt EDUs.
700+
# * Add to-device EDUs, but leave some space for device list updates.
701+
# * Add device list updates EDUs.
702+
# * If there's any remaining room, add other EDUs.
703+
pending_edus = []
704+
705+
# Add presence EDU.
706+
if self.queue._pending_presence:
707+
pending_edus.append(
708+
Edu(
709+
origin=self.queue._server_name,
710+
destination=self.queue._destination,
711+
edu_type=EduTypes.PRESENCE,
712+
content={
713+
"push": [
714+
format_user_presence_state(
715+
presence, self.queue._clock.time_msec()
716+
)
717+
for presence in self.queue._pending_presence.values()
718+
]
719+
},
720+
)
721+
)
722+
self.queue._pending_presence = {}
651723

652-
# We prioritize to-device messages so that existing encryption channels
724+
# Add read receipt EDUs.
725+
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
726+
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
727+
728+
# Next, prioritize to-device messages so that existing encryption channels
653729
# work. We also keep a few slots spare (by reducing the limit) so that
654730
# we can still trickle out some device list updates.
655731
(
656732
to_device_edus,
657733
device_stream_id,
658-
) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
734+
) = await self.queue._get_to_device_message_edus(edu_limit - 10)
659735

660736
if to_device_edus:
661737
self._device_stream_id = device_stream_id
662738
else:
663739
self.queue._last_device_stream_id = device_stream_id
664740

665-
device_edu_limit -= len(to_device_edus)
741+
pending_edus.extend(to_device_edus)
742+
edu_limit -= len(to_device_edus)
666743

744+
# Add device list update EDUs.
667745
device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
668-
device_edu_limit
746+
edu_limit
669747
)
670748

671749
if device_update_edus:
672750
self._device_list_id = dev_list_id
673751
else:
674752
self.queue._last_device_list_stream_id = dev_list_id
675753

676-
pending_edus = device_update_edus + to_device_edus
677-
678-
# Now add the read receipt EDU.
679-
pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
680-
681-
# And presence EDU.
682-
if self.queue._pending_presence:
683-
pending_edus.append(
684-
Edu(
685-
origin=self.queue._server_name,
686-
destination=self.queue._destination,
687-
edu_type=EduTypes.PRESENCE,
688-
content={
689-
"push": [
690-
format_user_presence_state(
691-
presence, self.queue._clock.time_msec()
692-
)
693-
for presence in self.queue._pending_presence.values()
694-
]
695-
},
696-
)
697-
)
698-
self.queue._pending_presence = {}
754+
pending_edus.extend(device_update_edus)
755+
edu_limit -= len(device_update_edus)
699756

700757
# Finally add any other types of EDUs if there is room.
701-
pending_edus.extend(
702-
self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
703-
)
704-
while (
705-
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
706-
and self.queue._pending_edus_keyed
707-
):
758+
other_edus = self.queue._pop_pending_edus(edu_limit)
759+
pending_edus.extend(other_edus)
760+
edu_limit -= len(other_edus)
761+
while edu_limit > 0 and self.queue._pending_edus_keyed:
708762
_, val = self.queue._pending_edus_keyed.popitem()
709763
pending_edus.append(val)
764+
edu_limit -= 1
710765

711766
# Now we look for any PDUs to send, by getting up to 50 PDUs from the
712767
# queue
@@ -717,8 +772,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
717772

718773
# if we've decided to send a transaction anyway, and we have room, we
719774
# may as well send any pending RRs
720-
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
721-
pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
775+
if edu_limit:
776+
pending_edus.extend(
777+
self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
778+
)
722779

723780
if self._pdus:
724781
self._last_stream_ordering = self._pdus[

synapse/handlers/receipts.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None
9292
continue
9393

9494
# Check if these receipts apply to a thread.
95-
thread_id = None
9695
data = user_values.get("data", {})
9796
thread_id = data.get("thread_id")
9897
# If the thread ID is invalid, consider it missing.

tests/federation/test_federation_sender.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,83 @@ def test_send_receipts(self):
8383
],
8484
)
8585

86+
@override_config({"send_federation": True})
87+
def test_send_receipts_thread(self):
88+
mock_send_transaction = (
89+
self.hs.get_federation_transport_client().send_transaction
90+
)
91+
mock_send_transaction.return_value = make_awaitable({})
92+
93+
# Create receipts for:
94+
#
95+
# * The same room / user on multiple threads.
96+
# * A different user in the same room.
97+
sender = self.hs.get_federation_sender()
98+
for user, thread in (
99+
("alice", None),
100+
("alice", "thread"),
101+
("bob", None),
102+
("bob", "diff-thread"),
103+
):
104+
receipt = ReadReceipt(
105+
"room_id",
106+
"m.read",
107+
user,
108+
["event_id"],
109+
thread_id=thread,
110+
data={"ts": 1234},
111+
)
112+
self.successResultOf(
113+
defer.ensureDeferred(sender.send_read_receipt(receipt))
114+
)
115+
116+
self.pump()
117+
118+
# expect a call to send_transaction with two EDUs to separate threads.
119+
mock_send_transaction.assert_called_once()
120+
json_cb = mock_send_transaction.call_args[0][1]
121+
data = json_cb()
122+
# Note that the ordering of the EDUs doesn't matter.
123+
self.assertCountEqual(
124+
data["edus"],
125+
[
126+
{
127+
"edu_type": EduTypes.RECEIPT,
128+
"content": {
129+
"room_id": {
130+
"m.read": {
131+
"alice": {
132+
"event_ids": ["event_id"],
133+
"data": {"ts": 1234, "thread_id": "thread"},
134+
},
135+
"bob": {
136+
"event_ids": ["event_id"],
137+
"data": {"ts": 1234, "thread_id": "diff-thread"},
138+
},
139+
}
140+
}
141+
},
142+
},
143+
{
144+
"edu_type": EduTypes.RECEIPT,
145+
"content": {
146+
"room_id": {
147+
"m.read": {
148+
"alice": {
149+
"event_ids": ["event_id"],
150+
"data": {"ts": 1234},
151+
},
152+
"bob": {
153+
"event_ids": ["event_id"],
154+
"data": {"ts": 1234},
155+
},
156+
}
157+
}
158+
},
159+
},
160+
],
161+
)
162+
86163
@override_config({"send_federation": True})
87164
def test_send_receipts_with_backoff(self):
88165
"""Send two receipts in quick succession; the second should be flushed, but

0 commit comments

Comments
 (0)