119
119
]
120
120
121
121
122
+ @attr .s (slots = True , auto_attribs = True )
123
+ class _RoomReceipt :
124
+ """
125
+ HttpPushAction instances include the information used to generate HTTP
126
+ requests to a push gateway.
127
+ """
128
+
129
+ unthreaded_stream_ordering : int = 0
130
+ # threaded_stream_ordering includes the main pseudo-thread.
131
+ threaded_stream_ordering : Dict [str , int ] = attr .Factory (dict )
132
+
133
+ def is_unread (self , thread_id : str , stream_ordering : int ) -> bool :
134
+ """Returns True if the stream ordering is unread according to the receipt information."""
135
+
136
+ # Only include push actions with a stream ordering after both the unthreaded
137
+ # and threaded receipt. Properly handles a user without any receipts present.
138
+ return (
139
+ self .unthreaded_stream_ordering < stream_ordering
140
+ and self .threaded_stream_ordering .get (thread_id , 0 ) < stream_ordering
141
+ )
142
+
143
+
144
+ # A _RoomReceipt with no receipts in it.
145
+ MISSING_ROOM_RECEIPT = _RoomReceipt ()
146
+
147
+
122
148
@attr .s (slots = True , frozen = True , auto_attribs = True )
123
149
class HttpPushAction :
124
150
"""
@@ -716,7 +742,7 @@ def f(txn: LoggingTransaction) -> List[str]:
716
742
717
743
def _get_receipts_by_room_txn (
718
744
self , txn : LoggingTransaction , user_id : str
719
- ) -> Dict [str , int ]:
745
+ ) -> Dict [str , _RoomReceipt ]:
720
746
"""
721
747
Generate a map of room ID to the latest stream ordering that has been
722
748
read by the given user.
@@ -726,7 +752,8 @@ def _get_receipts_by_room_txn(
726
752
user_id: The user to fetch receipts for.
727
753
728
754
Returns:
729
- A map of room ID to stream ordering for all rooms the user has a receipt in.
755
+ A map including all rooms the user is in with a receipt. It maps
756
+ room IDs to _RoomReceipt instances
730
757
"""
731
758
receipt_types_clause , args = make_in_list_sql_clause (
732
759
self .database_engine ,
@@ -735,20 +762,26 @@ def _get_receipts_by_room_txn(
735
762
)
736
763
737
764
sql = f"""
738
- SELECT room_id, MAX(stream_ordering)
765
+ SELECT room_id, thread_id, MAX(stream_ordering)
739
766
FROM receipts_linearized
740
767
INNER JOIN events USING (room_id, event_id)
741
768
WHERE { receipt_types_clause }
742
769
AND user_id = ?
743
- GROUP BY room_id
770
+ GROUP BY room_id, thread_id
744
771
"""
745
772
746
773
args .extend ((user_id ,))
747
774
txn .execute (sql , args )
748
- return {
749
- room_id : latest_stream_ordering
750
- for room_id , latest_stream_ordering in txn .fetchall ()
751
- }
775
+
776
+ result : Dict [str , _RoomReceipt ] = {}
777
+ for room_id , thread_id , stream_ordering in txn :
778
+ room_receipt = result .setdefault (room_id , _RoomReceipt ())
779
+ if thread_id is None :
780
+ room_receipt .unthreaded_stream_ordering = stream_ordering
781
+ else :
782
+ room_receipt .threaded_stream_ordering [thread_id ] = stream_ordering
783
+
784
+ return result
752
785
753
786
async def get_unread_push_actions_for_user_in_range_for_http (
754
787
self ,
@@ -781,9 +814,10 @@ async def get_unread_push_actions_for_user_in_range_for_http(
781
814
782
815
def get_push_actions_txn (
783
816
txn : LoggingTransaction ,
784
- ) -> List [Tuple [str , str , int , str , bool ]]:
817
+ ) -> List [Tuple [str , str , str , int , str , bool ]]:
785
818
sql = """
786
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
819
+ SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
820
+ ep.actions, ep.highlight
787
821
FROM event_push_actions AS ep
788
822
WHERE
789
823
ep.user_id = ?
@@ -793,7 +827,7 @@ def get_push_actions_txn(
793
827
ORDER BY ep.stream_ordering ASC LIMIT ?
794
828
"""
795
829
txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
796
- return cast (List [Tuple [str , str , int , str , bool ]], txn .fetchall ())
830
+ return cast (List [Tuple [str , str , str , int , str , bool ]], txn .fetchall ())
797
831
798
832
push_actions = await self .db_pool .runInteraction (
799
833
"get_unread_push_actions_for_user_in_range_http" , get_push_actions_txn
@@ -806,10 +840,10 @@ def get_push_actions_txn(
806
840
stream_ordering = stream_ordering ,
807
841
actions = _deserialize_action (actions , highlight ),
808
842
)
809
- for event_id , room_id , stream_ordering , actions , highlight in push_actions
810
- # Only include push actions with a stream ordering after any receipt, or without any
811
- # receipt present (invited to but never read rooms).
812
- if stream_ordering > receipts_by_room . get ( room_id , 0 )
843
+ for event_id , room_id , thread_id , stream_ordering , actions , highlight in push_actions
844
+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
845
+ thread_id , stream_ordering
846
+ )
813
847
]
814
848
815
849
# Now sort it so it's ordered correctly, since currently it will
@@ -853,10 +887,10 @@ async def get_unread_push_actions_for_user_in_range_for_email(
853
887
854
888
def get_push_actions_txn (
855
889
txn : LoggingTransaction ,
856
- ) -> List [Tuple [str , str , int , str , bool , int ]]:
890
+ ) -> List [Tuple [str , str , str , int , str , bool , int ]]:
857
891
sql = """
858
- SELECT ep.event_id, ep.room_id, ep.stream_ordering , ep.actions ,
859
- ep.highlight, e.received_ts
892
+ SELECT ep.event_id, ep.room_id, ep.thread_id , ep.stream_ordering ,
893
+ ep.actions, ep. highlight, e.received_ts
860
894
FROM event_push_actions AS ep
861
895
INNER JOIN events AS e USING (room_id, event_id)
862
896
WHERE
@@ -867,7 +901,7 @@ def get_push_actions_txn(
867
901
ORDER BY ep.stream_ordering DESC LIMIT ?
868
902
"""
869
903
txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
870
- return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
904
+ return cast (List [Tuple [str , str , str , int , str , bool , int ]], txn .fetchall ())
871
905
872
906
push_actions = await self .db_pool .runInteraction (
873
907
"get_unread_push_actions_for_user_in_range_email" , get_push_actions_txn
@@ -882,10 +916,10 @@ def get_push_actions_txn(
882
916
actions = _deserialize_action (actions , highlight ),
883
917
received_ts = received_ts ,
884
918
)
885
- for event_id , room_id , stream_ordering , actions , highlight , received_ts in push_actions
886
- # Only include push actions with a stream ordering after any receipt, or without any
887
- # receipt present (invited to but never read rooms).
888
- if stream_ordering > receipts_by_room . get ( room_id , 0 )
919
+ for event_id , room_id , thread_id , stream_ordering , actions , highlight , received_ts in push_actions
920
+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
921
+ thread_id , stream_ordering
922
+ )
889
923
]
890
924
891
925
# Now sort it so it's ordered correctly, since currently it will
0 commit comments