35
35
from synapse .logging .opentracing import SynapseTags , set_tag
36
36
from synapse .metrics import sent_transactions_counter
37
37
from synapse .metrics .background_process_metrics import run_as_background_process
38
- from synapse .types import ReadReceipt
38
+ from synapse .types import JsonDict , ReadReceipt
39
39
from synapse .util .retryutils import NotRetryingDestination , get_retry_limiter
40
40
from synapse .visibility import filter_events_for_server
41
41
@@ -136,8 +136,11 @@ def __init__(
136
136
# destination
137
137
self ._pending_presence : Dict [str , UserPresenceState ] = {}
138
138
139
- # room_id -> receipt_type -> user_id -> receipt_dict
140
- self ._pending_rrs : Dict [str , Dict [str , Dict [str , dict ]]] = {}
139
+ # List of room_id -> receipt_type -> user_id -> receipt_dict,
140
+ #
141
+ # Each receipt can only have a single receipt per
142
+ # (room ID, receipt type, user ID, thread ID) tuple.
143
+ self ._pending_receipt_edus : List [Dict [str , Dict [str , Dict [str , dict ]]]] = []
141
144
self ._rrs_pending_flush = False
142
145
143
146
# stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
202
205
Args:
203
206
receipt: receipt to be queued
204
207
"""
205
- self ._pending_rrs .setdefault (receipt .room_id , {}).setdefault (
206
- receipt .receipt_type , {}
207
- )[receipt .user_id ] = {"event_ids" : receipt .event_ids , "data" : receipt .data }
208
+ serialized_receipt : JsonDict = {
209
+ "event_ids" : receipt .event_ids ,
210
+ "data" : receipt .data ,
211
+ }
212
+ if receipt .thread_id is not None :
213
+ serialized_receipt ["data" ]["thread_id" ] = receipt .thread_id
214
+
215
+ # Find which EDU to add this receipt to. There's three situations depending
216
+ # on the (room ID, receipt type, user, thread ID) tuple:
217
+ #
218
+ # 1. If it fully matches, clobber the information.
219
+ # 2. If it is missing, add the information.
220
+ # 3. If the subset tuple of (room ID, receipt type, user) matches, check
221
+ # the next EDU (or add a new EDU).
222
+ for edu in self ._pending_receipt_edus :
223
+ receipt_content = edu .setdefault (receipt .room_id , {}).setdefault (
224
+ receipt .receipt_type , {}
225
+ )
226
+ # If this room ID, receipt type, user ID is not in this EDU, OR if
227
+ # the full tuple matches, use the current EDU.
228
+ if (
229
+ receipt .user_id not in receipt_content
230
+ or receipt_content [receipt .user_id ].get ("thread_id" )
231
+ == receipt .thread_id
232
+ ):
233
+ receipt_content [receipt .user_id ] = serialized_receipt
234
+ break
235
+
236
+ # If no matching EDU was found, create a new one.
237
+ else :
238
+ self ._pending_receipt_edus .append (
239
+ {
240
+ receipt .room_id : {
241
+ receipt .receipt_type : {receipt .user_id : serialized_receipt }
242
+ }
243
+ }
244
+ )
208
245
209
246
def flush_read_receipts_for_room (self , room_id : str ) -> None :
210
- # if we don't have any read-receipts for this room, it may be that we've already
211
- # sent them out, so we don't need to flush.
212
- if room_id not in self ._pending_rrs :
213
- return
214
- self ._rrs_pending_flush = True
215
- self .attempt_new_transaction ()
247
+ # If there are any pending receipts for this room then force-flush them
248
+ # in a new transaction.
249
+ for edu in self ._pending_receipt_edus :
250
+ if room_id in edu :
251
+ self ._rrs_pending_flush = True
252
+ self .attempt_new_transaction ()
253
+ # No use in checking remaining EDUs if the room was found.
254
+ break
216
255
217
256
def send_keyed_edu (self , edu : Edu , key : Hashable ) -> None :
218
257
self ._pending_edus_keyed [(edu .edu_type , key )] = edu
@@ -351,7 +390,7 @@ async def _transaction_transmission_loop(self) -> None:
351
390
self ._pending_edus = []
352
391
self ._pending_edus_keyed = {}
353
392
self ._pending_presence = {}
354
- self ._pending_rrs = {}
393
+ self ._pending_receipt_edus = []
355
394
356
395
self ._start_catching_up ()
357
396
except FederationDeniedError as e :
@@ -543,22 +582,27 @@ async def _catch_up_transmission_loop(self) -> None:
543
582
self ._destination , last_successful_stream_ordering
544
583
)
545
584
546
- def _get_rr_edus (self , force_flush : bool ) -> Iterable [Edu ]:
547
- if not self ._pending_rrs :
585
+ def _get_receipt_edus (self , force_flush : bool , limit : int ) -> Iterable [Edu ]:
586
+ if not self ._pending_receipt_edus :
548
587
return
549
588
if not force_flush and not self ._rrs_pending_flush :
550
589
# not yet time for this lot
551
590
return
552
591
553
- edu = Edu (
554
- origin = self ._server_name ,
555
- destination = self ._destination ,
556
- edu_type = EduTypes .RECEIPT ,
557
- content = self ._pending_rrs ,
558
- )
559
- self ._pending_rrs = {}
560
- self ._rrs_pending_flush = False
561
- yield edu
592
+ # Send at most limit EDUs for receipts.
593
+ for content in self ._pending_receipt_edus [:limit ]:
594
+ yield Edu (
595
+ origin = self ._server_name ,
596
+ destination = self ._destination ,
597
+ edu_type = EduTypes .RECEIPT ,
598
+ content = content ,
599
+ )
600
+ self ._pending_receipt_edus = self ._pending_receipt_edus [limit :]
601
+
602
+ # If there are still pending read-receipts, don't reset the pending flush
603
+ # flag.
604
+ if not self ._pending_receipt_edus :
605
+ self ._rrs_pending_flush = False
562
606
563
607
def _pop_pending_edus (self , limit : int ) -> List [Edu ]:
564
608
pending_edus = self ._pending_edus
@@ -645,68 +689,79 @@ class _TransactionQueueManager:
645
689
async def __aenter__ (self ) -> Tuple [List [EventBase ], List [Edu ]]:
646
690
# First we calculate the EDUs we want to send, if any.
647
691
648
- # We start by fetching device related EDUs, i.e device updates and to
649
- # device messages. We have to keep 2 free slots for presence and rr_edus.
650
- device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
692
+ # There's a maximum number of EDUs that can be sent with a transaction,
693
+ # generally device updates and to-device messages get priority, but we
694
+ # want to ensure that there's room for some other EDUs as well.
695
+ #
696
+ # This is done by:
697
+ #
698
+ # * Add a presence EDU, if one exists.
699
+ # * Add up-to a small limit of read receipt EDUs.
700
+ # * Add to-device EDUs, but leave some space for device list updates.
701
+ # * Add device list updates EDUs.
702
+ # * If there's any remaining room, add other EDUs.
703
+ pending_edus = []
704
+
705
+ # Add presence EDU.
706
+ if self .queue ._pending_presence :
707
+ pending_edus .append (
708
+ Edu (
709
+ origin = self .queue ._server_name ,
710
+ destination = self .queue ._destination ,
711
+ edu_type = EduTypes .PRESENCE ,
712
+ content = {
713
+ "push" : [
714
+ format_user_presence_state (
715
+ presence , self .queue ._clock .time_msec ()
716
+ )
717
+ for presence in self .queue ._pending_presence .values ()
718
+ ]
719
+ },
720
+ )
721
+ )
722
+ self .queue ._pending_presence = {}
651
723
652
- # We prioritize to-device messages so that existing encryption channels
724
+ # Add read receipt EDUs.
725
+ pending_edus .extend (self .queue ._get_receipt_edus (force_flush = False , limit = 5 ))
726
+ edu_limit = MAX_EDUS_PER_TRANSACTION - len (pending_edus )
727
+
728
+ # Next, prioritize to-device messages so that existing encryption channels
653
729
# work. We also keep a few slots spare (by reducing the limit) so that
654
730
# we can still trickle out some device list updates.
655
731
(
656
732
to_device_edus ,
657
733
device_stream_id ,
658
- ) = await self .queue ._get_to_device_message_edus (device_edu_limit - 10 )
734
+ ) = await self .queue ._get_to_device_message_edus (edu_limit - 10 )
659
735
660
736
if to_device_edus :
661
737
self ._device_stream_id = device_stream_id
662
738
else :
663
739
self .queue ._last_device_stream_id = device_stream_id
664
740
665
- device_edu_limit -= len (to_device_edus )
741
+ pending_edus .extend (to_device_edus )
742
+ edu_limit -= len (to_device_edus )
666
743
744
+ # Add device list update EDUs.
667
745
device_update_edus , dev_list_id = await self .queue ._get_device_update_edus (
668
- device_edu_limit
746
+ edu_limit
669
747
)
670
748
671
749
if device_update_edus :
672
750
self ._device_list_id = dev_list_id
673
751
else :
674
752
self .queue ._last_device_list_stream_id = dev_list_id
675
753
676
- pending_edus = device_update_edus + to_device_edus
677
-
678
- # Now add the read receipt EDU.
679
- pending_edus .extend (self .queue ._get_rr_edus (force_flush = False ))
680
-
681
- # And presence EDU.
682
- if self .queue ._pending_presence :
683
- pending_edus .append (
684
- Edu (
685
- origin = self .queue ._server_name ,
686
- destination = self .queue ._destination ,
687
- edu_type = EduTypes .PRESENCE ,
688
- content = {
689
- "push" : [
690
- format_user_presence_state (
691
- presence , self .queue ._clock .time_msec ()
692
- )
693
- for presence in self .queue ._pending_presence .values ()
694
- ]
695
- },
696
- )
697
- )
698
- self .queue ._pending_presence = {}
754
+ pending_edus .extend (device_update_edus )
755
+ edu_limit -= len (device_update_edus )
699
756
700
757
# Finally add any other types of EDUs if there is room.
701
- pending_edus .extend (
702
- self .queue ._pop_pending_edus (MAX_EDUS_PER_TRANSACTION - len (pending_edus ))
703
- )
704
- while (
705
- len (pending_edus ) < MAX_EDUS_PER_TRANSACTION
706
- and self .queue ._pending_edus_keyed
707
- ):
758
+ other_edus = self .queue ._pop_pending_edus (edu_limit )
759
+ pending_edus .extend (other_edus )
760
+ edu_limit -= len (other_edus )
761
+ while edu_limit > 0 and self .queue ._pending_edus_keyed :
708
762
_ , val = self .queue ._pending_edus_keyed .popitem ()
709
763
pending_edus .append (val )
764
+ edu_limit -= 1
710
765
711
766
# Now we look for any PDUs to send, by getting up to 50 PDUs from the
712
767
# queue
@@ -717,8 +772,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
717
772
718
773
# if we've decided to send a transaction anyway, and we have room, we
719
774
# may as well send any pending RRs
720
- if len (pending_edus ) < MAX_EDUS_PER_TRANSACTION :
721
- pending_edus .extend (self .queue ._get_rr_edus (force_flush = True ))
775
+ if edu_limit :
776
+ pending_edus .extend (
777
+ self .queue ._get_receipt_edus (force_flush = True , limit = edu_limit )
778
+ )
722
779
723
780
if self ._pdus :
724
781
self ._last_stream_ordering = self ._pdus [
0 commit comments