119
119
]
120
120
121
121
122
+ @attr .s (slots = True , auto_attribs = True )
123
+ class _RoomReceipt :
124
+ """
125
+ HttpPushAction instances include the information used to generate HTTP
126
+ requests to a push gateway.
127
+ """
128
+
129
+ unthreaded_stream_ordering : int = 0
130
+ # threaded_stream_ordering includes the main pseudo-thread.
131
+ threaded_stream_ordering : Dict [str , int ] = attr .Factory (dict )
132
+
133
+ def is_unread (self , thread_id : str , stream_ordering : int ) -> bool :
134
+ """Returns True if the stream ordering is unread according to the receipt information."""
135
+
136
+ # Only include push actions with a stream ordering after both the unthreaded
137
+ # and threaded receipt. Properly handles a user without any receipts present.
138
+ return (
139
+ self .unthreaded_stream_ordering < stream_ordering
140
+ and self .threaded_stream_ordering .get (thread_id , 0 ) < stream_ordering
141
+ )
142
+
143
+
144
+ # A _RoomReceipt with no receipts in it.
145
+ MISSING_ROOM_RECEIPT = _RoomReceipt ()
146
+
147
+
122
148
@attr .s (slots = True , frozen = True , auto_attribs = True )
123
149
class HttpPushAction :
124
150
"""
@@ -709,7 +735,7 @@ def f(txn: LoggingTransaction) -> List[str]:
709
735
710
736
def _get_receipts_by_room_txn (
711
737
self , txn : LoggingTransaction , user_id : str
712
- ) -> Dict [str , int ]:
738
+ ) -> Dict [str , _RoomReceipt ]:
713
739
"""
714
740
Generate a map of room ID to the latest stream ordering that has been
715
741
read by the given user.
@@ -719,7 +745,8 @@ def _get_receipts_by_room_txn(
719
745
user_id: The user to fetch receipts for.
720
746
721
747
Returns:
722
- A map of room ID to stream ordering for all rooms the user has a receipt in.
748
+ A map including all rooms the user is in with a receipt. It maps
749
+ room IDs to _RoomReceipt instances
723
750
"""
724
751
receipt_types_clause , args = make_in_list_sql_clause (
725
752
self .database_engine ,
@@ -728,20 +755,26 @@ def _get_receipts_by_room_txn(
728
755
)
729
756
730
757
sql = f"""
731
- SELECT room_id, MAX(stream_ordering)
758
+ SELECT room_id, thread_id, MAX(stream_ordering)
732
759
FROM receipts_linearized
733
760
INNER JOIN events USING (room_id, event_id)
734
761
WHERE { receipt_types_clause }
735
762
AND user_id = ?
736
- GROUP BY room_id
763
+ GROUP BY room_id, thread_id
737
764
"""
738
765
739
766
args .extend ((user_id ,))
740
767
txn .execute (sql , args )
741
- return {
742
- room_id : latest_stream_ordering
743
- for room_id , latest_stream_ordering in txn .fetchall ()
744
- }
768
+
769
+ result : Dict [str , _RoomReceipt ] = {}
770
+ for room_id , thread_id , stream_ordering in txn :
771
+ room_receipt = result .setdefault (room_id , _RoomReceipt ())
772
+ if thread_id is None :
773
+ room_receipt .unthreaded_stream_ordering = stream_ordering
774
+ else :
775
+ room_receipt .threaded_stream_ordering [thread_id ] = stream_ordering
776
+
777
+ return result
745
778
746
779
async def get_unread_push_actions_for_user_in_range_for_http (
747
780
self ,
@@ -774,9 +807,10 @@ async def get_unread_push_actions_for_user_in_range_for_http(
774
807
775
808
def get_push_actions_txn (
776
809
txn : LoggingTransaction ,
777
- ) -> List [Tuple [str , str , int , str , bool ]]:
810
+ ) -> List [Tuple [str , str , str , int , str , bool ]]:
778
811
sql = """
779
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
812
+ SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
813
+ ep.actions, ep.highlight
780
814
FROM event_push_actions AS ep
781
815
WHERE
782
816
ep.user_id = ?
@@ -786,7 +820,7 @@ def get_push_actions_txn(
786
820
ORDER BY ep.stream_ordering ASC LIMIT ?
787
821
"""
788
822
txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
789
- return cast (List [Tuple [str , str , int , str , bool ]], txn .fetchall ())
823
+ return cast (List [Tuple [str , str , str , int , str , bool ]], txn .fetchall ())
790
824
791
825
push_actions = await self .db_pool .runInteraction (
792
826
"get_unread_push_actions_for_user_in_range_http" , get_push_actions_txn
@@ -799,10 +833,10 @@ def get_push_actions_txn(
799
833
stream_ordering = stream_ordering ,
800
834
actions = _deserialize_action (actions , highlight ),
801
835
)
802
- for event_id , room_id , stream_ordering , actions , highlight in push_actions
803
- # Only include push actions with a stream ordering after any receipt, or without any
804
- # receipt present (invited to but never read rooms).
805
- if stream_ordering > receipts_by_room . get ( room_id , 0 )
836
+ for event_id , room_id , thread_id , stream_ordering , actions , highlight in push_actions
837
+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
838
+ thread_id , stream_ordering
839
+ )
806
840
]
807
841
808
842
# Now sort it so it's ordered correctly, since currently it will
@@ -846,10 +880,10 @@ async def get_unread_push_actions_for_user_in_range_for_email(
846
880
847
881
def get_push_actions_txn (
848
882
txn : LoggingTransaction ,
849
- ) -> List [Tuple [str , str , int , str , bool , int ]]:
883
+ ) -> List [Tuple [str , str , str , int , str , bool , int ]]:
850
884
sql = """
851
- SELECT ep.event_id, ep.room_id, ep.stream_ordering , ep.actions ,
852
- ep.highlight, e.received_ts
885
+ SELECT ep.event_id, ep.room_id, ep.thread_id , ep.stream_ordering ,
886
+ ep.actions, ep. highlight, e.received_ts
853
887
FROM event_push_actions AS ep
854
888
INNER JOIN events AS e USING (room_id, event_id)
855
889
WHERE
@@ -860,7 +894,7 @@ def get_push_actions_txn(
860
894
ORDER BY ep.stream_ordering DESC LIMIT ?
861
895
"""
862
896
txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
863
- return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
897
+ return cast (List [Tuple [str , str , str , int , str , bool , int ]], txn .fetchall ())
864
898
865
899
push_actions = await self .db_pool .runInteraction (
866
900
"get_unread_push_actions_for_user_in_range_email" , get_push_actions_txn
@@ -875,10 +909,10 @@ def get_push_actions_txn(
875
909
actions = _deserialize_action (actions , highlight ),
876
910
received_ts = received_ts ,
877
911
)
878
- for event_id , room_id , stream_ordering , actions , highlight , received_ts in push_actions
879
- # Only include push actions with a stream ordering after any receipt, or without any
880
- # receipt present (invited to but never read rooms).
881
- if stream_ordering > receipts_by_room . get ( room_id , 0 )
912
+ for event_id , room_id , thread_id , stream_ordering , actions , highlight , received_ts in push_actions
913
+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
914
+ thread_id , stream_ordering
915
+ )
882
916
]
883
917
884
918
# Now sort it so it's ordered correctly, since currently it will
0 commit comments