Skip to content

Commit 5cc23c1

Browse files
Fizzadarbradtgmurrayanoadragon453
committed
Bound ephemeral events by key (matrix-org#12544)
Co-authored-by: Brad Murray <[email protected]> Co-authored-by: Andrew Morgan <[email protected]>
1 parent 95e8a3a commit 5cc23c1

File tree

4 files changed

+87
-4
lines changed

4 files changed

+87
-4
lines changed

changelog.d/12544.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.

synapse/handlers/appservice.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ async def _handle_typing(
416416
return typing
417417

418418
async def _handle_receipts(
419-
self, service: ApplicationService, new_token: Optional[int]
419+
self, service: ApplicationService, new_token: int
420420
) -> List[JsonDict]:
421421
"""
422422
Return the latest read receipts that the given application service should receive.
@@ -447,7 +447,7 @@ async def _handle_receipts(
447447

448448
receipts_source = self.event_sources.sources.receipt
449449
receipts, _ = await receipts_source.get_new_events_as(
450-
service=service, from_key=from_key
450+
service=service, from_key=from_key, to_key=new_token
451451
)
452452
return receipts
453453

synapse/handlers/receipts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,14 @@ async def get_new_events(
240240
return events, to_key
241241

242242
async def get_new_events_as(
243-
self, from_key: int, service: ApplicationService
243+
self, from_key: int, to_key: int, service: ApplicationService
244244
) -> Tuple[List[JsonDict], int]:
245245
"""Returns a set of new read receipt events that an appservice
246246
may be interested in.
247247
248248
Args:
249249
from_key: the stream position at which events should be fetched from
250+
to_key: the stream position up to which events should be fetched to
250251
service: The appservice which may be interested
251252
252253
Returns:
@@ -256,7 +257,6 @@ async def get_new_events_as(
256257
* The current read receipt stream token.
257258
"""
258259
from_key = int(from_key)
259-
to_key = self.get_current_key()
260260

261261
if from_key == to_key:
262262
return [], to_key

tests/handlers/test_appservice.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,88 @@ def prepare(self, reactor, clock, hs):
411411
"exclusive_as_user", "password", self.exclusive_as_user_device_id
412412
)
413413

414+
def test_sending_read_receipt_batches_to_application_services(self):
415+
"""Tests that a large batch of read receipts are sent correctly to
416+
interested application services.
417+
"""
418+
# Register an application service that's interested in a certain user
419+
# and room prefix
420+
interested_appservice = self._register_application_service(
421+
namespaces={
422+
ApplicationService.NS_USERS: [
423+
{
424+
"regex": "@exclusive_as_user:.+",
425+
"exclusive": True,
426+
}
427+
],
428+
ApplicationService.NS_ROOMS: [
429+
{
430+
"regex": "!fakeroom_.*",
431+
"exclusive": True,
432+
}
433+
],
434+
},
435+
)
436+
437+
# "Complete" a transaction.
438+
# All this really does for us is make an entry in the application_services_state
439+
# database table, which tracks the current stream_token per stream ID per AS.
440+
self.get_success(
441+
self.hs.get_datastores().main.complete_appservice_txn(
442+
0,
443+
interested_appservice,
444+
)
445+
)
446+
447+
# Now, pretend that we receive a large burst of read receipts (300 total) that
448+
# all come in at once.
449+
for i in range(300):
450+
self.get_success(
451+
# Insert a fake read receipt into the database
452+
self.hs.get_datastores().main.insert_receipt(
453+
# We have to use unique room ID + user ID combinations here, as the db query
454+
# is an upsert.
455+
room_id=f"!fakeroom_{i}:test",
456+
receipt_type="m.read",
457+
user_id=self.local_user,
458+
event_ids=[f"$eventid_{i}"],
459+
data={},
460+
)
461+
)
462+
463+
# Now notify the appservice handler that 300 read receipts have all arrived
464+
# at once. What will it do!
465+
# note: stream tokens start at 2
466+
for stream_token in range(2, 303):
467+
self.get_success(
468+
self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
469+
services=[interested_appservice],
470+
stream_key="receipt_key",
471+
new_token=stream_token,
472+
users=[self.exclusive_as_user],
473+
)
474+
)
475+
476+
# Using our txn send mock, we can see what the AS received. After iterating over every
477+
# transaction, we'd like to see all 300 read receipts accounted for.
478+
# No more, no less.
479+
all_ephemeral_events = []
480+
for call in self.send_mock.call_args_list:
481+
ephemeral_events = call[0][2]
482+
all_ephemeral_events += ephemeral_events
483+
484+
# Ensure that no duplicate events were sent
485+
self.assertEqual(len(all_ephemeral_events), 300)
486+
487+
# Check that the ephemeral event is a read receipt with the expected structure
488+
latest_read_receipt = all_ephemeral_events[-1]
489+
self.assertEqual(latest_read_receipt["type"], "m.receipt")
490+
491+
event_id = list(latest_read_receipt["content"].keys())[0]
492+
self.assertEqual(
493+
latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
494+
)
495+
414496
@unittest.override_config(
415497
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
416498
)

0 commit comments

Comments
 (0)