Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 23eef74

Browse files
committed
Potentially send multiple EDUs to separate threads.
1 parent 367f73e commit 23eef74

File tree

2 files changed

+72
-14
lines changed

2 files changed

+72
-14
lines changed

synapse/federation/sender/per_destination_queue.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ def __init__(
136136
# destination
137137
self._pending_presence: Dict[str, UserPresenceState] = {}
138138

139-
# room_id -> receipt_type -> user_id -> receipt_dict
140-
self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
139+
# room_id -> receipt_type -> thread_id -> user_id -> receipt_dict
140+
self._pending_rrs: Dict[
141+
str, Dict[str, Dict[Optional[str], Dict[str, dict]]]
142+
] = {}
141143
self._rrs_pending_flush = False
142144

143145
# stream_id of last successfully sent to-device message.
@@ -202,12 +204,15 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
202204
Args:
203205
receipt: receipt to be queued
204206
"""
205-
serialized_receipt: JsonDict = {"event_ids": receipt.event_ids, "data": receipt.data}
207+
serialized_receipt: JsonDict = {
208+
"event_ids": receipt.event_ids,
209+
"data": receipt.data,
210+
}
206211
if receipt.thread_id is not None:
207212
serialized_receipt["data"]["thread_id"] = receipt.thread_id
208213
self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
209214
receipt.receipt_type, {}
210-
)[receipt.user_id] = serialized_receipt
215+
).setdefault(receipt.thread_id, {})[receipt.user_id] = serialized_receipt
211216

212217
def flush_read_receipts_for_room(self, room_id: str) -> None:
213218
# if we don't have any read-receipts for this room, it may be that we've already
@@ -552,15 +557,44 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
552557
# not yet time for this lot
553558
return
554559

555-
edu = Edu(
556-
origin=self._server_name,
557-
destination=self._destination,
558-
edu_type=EduTypes.RECEIPT,
559-
content=self._pending_rrs,
560-
)
560+
# Build the EDUs needed to send these receipts. This is a bit complicated
561+
# since we can share one for each unique (room, receipt type, user), but
562+
# need additional ones for different threads. The result is that we will
563+
# send N EDUs where N is the maximum number of threads in a room.
564+
#
565+
# This could be slightly more efficient by bundling users who have only
566+
# send receipts for different threads.
567+
while self._pending_rrs:
568+
# The next EDU's content.
569+
content = {}
570+
571+
# Iterate each room's receipt types and threads, adding it to the content.
572+
for room_id in list(self._pending_rrs.keys()):
573+
for receipt_type in list(self._pending_rrs[room_id].keys()):
574+
thread_ids = self._pending_rrs[room_id][receipt_type]
575+
# The thread ID itself doesn't matter at this point.
576+
content.setdefault(room_id, {})[
577+
receipt_type
578+
] = thread_ids.popitem()[1]
579+
580+
# If there are no threads left in this room / receipt type.
581+
# Clear it out.
582+
if not thread_ids:
583+
del self._pending_rrs[room_id][receipt_type]
584+
585+
# Again, clear out any blank rooms.
586+
if not self._pending_rrs[room_id]:
587+
del self._pending_rrs[room_id]
588+
589+
yield Edu(
590+
origin=self._server_name,
591+
destination=self._destination,
592+
edu_type=EduTypes.RECEIPT,
593+
content=content,
594+
)
595+
561596
self._pending_rrs = {}
562597
self._rrs_pending_flush = False
563-
yield edu
564598

565599
def _pop_pending_edus(self, limit: int) -> List[Edu]:
566600
pending_edus = self._pending_edus

tests/federation/test_federation_sender.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,17 @@ def test_send_receipts_thread(self):
9090
)
9191
mock_send_transaction.return_value = make_awaitable({})
9292

93+
# Create receipts for the same room and user, but on two different threads,
9394
sender = self.hs.get_federation_sender()
95+
receipt = ReadReceipt(
96+
"room_id",
97+
"m.read",
98+
"user_id",
99+
["event_id"],
100+
thread_id=None,
101+
data={"ts": 1234},
102+
)
103+
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
94104
receipt = ReadReceipt(
95105
"room_id",
96106
"m.read",
@@ -103,11 +113,12 @@ def test_send_receipts_thread(self):
103113

104114
self.pump()
105115

106-
# expect a call to send_transaction
116+
# expect a call to send_transaction with two EDUs to separate threads.
107117
mock_send_transaction.assert_called_once()
108118
json_cb = mock_send_transaction.call_args[0][1]
109119
data = json_cb()
110-
self.assertEqual(
120+
# Note that the ordering of the EDUs doesn't matter.
121+
self.assertCountEqual(
111122
data["edus"],
112123
[
113124
{
@@ -122,7 +133,20 @@ def test_send_receipts_thread(self):
122133
}
123134
}
124135
},
125-
}
136+
},
137+
{
138+
"edu_type": EduTypes.RECEIPT,
139+
"content": {
140+
"room_id": {
141+
"m.read": {
142+
"user_id": {
143+
"event_ids": ["event_id"],
144+
"data": {"ts": 1234},
145+
}
146+
}
147+
}
148+
},
149+
},
126150
],
127151
)
128152

0 commit comments

Comments
 (0)