Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit ee6535e

Browse files
committed
Handle threads when fetching events for push.
1 parent 52b0a3d commit ee6535e

File tree

3 files changed

+97
-40
lines changed

3 files changed

+97
-40
lines changed

changelog.d/13878.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).

synapse/storage/databases/main/event_push_actions.py

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,32 @@
120120
]
121121

122122

123+
@attr.s(slots=True, auto_attribs=True)
124+
class _RoomReceipt:
125+
"""
126+
HttpPushAction instances include the information used to generate HTTP
127+
requests to a push gateway.
128+
"""
129+
130+
unthreaded_stream_ordering: int = 0
131+
# threaded_stream_ordering includes the main pseudo-thread.
132+
threaded_stream_ordering: Dict[str, int] = attr.Factory(dict)
133+
134+
def is_unread(self, thread_id: str, stream_ordering: int) -> bool:
135+
"""Returns True if the stream ordering is unread according to the receipt information."""
136+
137+
# Only include push actions with a stream ordering after both the unthreaded
138+
# and threaded receipt. Properly handles a user without any receipts present.
139+
return (
140+
self.unthreaded_stream_ordering < stream_ordering
141+
and self.threaded_stream_ordering.get(thread_id, 0) < stream_ordering
142+
)
143+
144+
145+
# A _RoomReceipt with no receipts in it.
146+
MISSING_ROOM_RECEIPT = _RoomReceipt()
147+
148+
123149
@attr.s(slots=True, frozen=True, auto_attribs=True)
124150
class HttpPushAction:
125151
"""
@@ -705,7 +731,7 @@ def f(txn: LoggingTransaction) -> List[str]:
705731

706732
def _get_receipts_by_room_txn(
707733
self, txn: LoggingTransaction, user_id: str
708-
) -> Dict[str, int]:
734+
) -> Dict[str, _RoomReceipt]:
709735
"""
710736
Generate a map of room ID to the latest stream ordering that has been
711737
read by the given user.
@@ -715,7 +741,8 @@ def _get_receipts_by_room_txn(
715741
user_id: The user to fetch receipts for.
716742
717743
Returns:
718-
A map of room ID to stream ordering for all rooms the user has a receipt in.
744+
A map including all rooms the user is in with a receipt. It maps
745+
room IDs to _RoomReceipt instances
719746
"""
720747
receipt_types_clause, args = make_in_list_sql_clause(
721748
self.database_engine,
@@ -727,20 +754,26 @@ def _get_receipts_by_room_txn(
727754
)
728755

729756
sql = f"""
730-
SELECT room_id, MAX(stream_ordering)
757+
SELECT room_id, thread_id, MAX(stream_ordering)
731758
FROM receipts_linearized
732759
INNER JOIN events USING (room_id, event_id)
733760
WHERE {receipt_types_clause}
734761
AND user_id = ?
735-
GROUP BY room_id
762+
GROUP BY room_id, thread_id
736763
"""
737764

738765
args.extend((user_id,))
739766
txn.execute(sql, args)
740-
return {
741-
room_id: latest_stream_ordering
742-
for room_id, latest_stream_ordering in txn.fetchall()
743-
}
767+
768+
result: Dict[str, _RoomReceipt] = {}
769+
for room_id, thread_id, stream_ordering in txn:
770+
room_receipt = result.setdefault(room_id, _RoomReceipt())
771+
if thread_id is None:
772+
room_receipt.unthreaded_stream_ordering = stream_ordering
773+
else:
774+
room_receipt.threaded_stream_ordering[thread_id] = stream_ordering
775+
776+
return result
744777

745778
async def get_unread_push_actions_for_user_in_range_for_http(
746779
self,
@@ -773,9 +806,10 @@ async def get_unread_push_actions_for_user_in_range_for_http(
773806

774807
def get_push_actions_txn(
775808
txn: LoggingTransaction,
776-
) -> List[Tuple[str, str, int, str, bool]]:
809+
) -> List[Tuple[str, str, str, int, str, bool]]:
777810
sql = """
778-
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
811+
SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
812+
ep.actions, ep.highlight
779813
FROM event_push_actions AS ep
780814
WHERE
781815
ep.user_id = ?
@@ -785,7 +819,7 @@ def get_push_actions_txn(
785819
ORDER BY ep.stream_ordering ASC LIMIT ?
786820
"""
787821
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
788-
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
822+
return cast(List[Tuple[str, str, str, int, str, bool]], txn.fetchall())
789823

790824
push_actions = await self.db_pool.runInteraction(
791825
"get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
@@ -798,10 +832,10 @@ def get_push_actions_txn(
798832
stream_ordering=stream_ordering,
799833
actions=_deserialize_action(actions, highlight),
800834
)
801-
for event_id, room_id, stream_ordering, actions, highlight in push_actions
802-
# Only include push actions with a stream ordering after any receipt, or without any
803-
# receipt present (invited to but never read rooms).
804-
if stream_ordering > receipts_by_room.get(room_id, 0)
835+
for event_id, room_id, thread_id, stream_ordering, actions, highlight in push_actions
836+
if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
837+
thread_id, stream_ordering
838+
)
805839
]
806840

807841
# Now sort it so it's ordered correctly, since currently it will
@@ -845,10 +879,10 @@ async def get_unread_push_actions_for_user_in_range_for_email(
845879

846880
def get_push_actions_txn(
847881
txn: LoggingTransaction,
848-
) -> List[Tuple[str, str, int, str, bool, int]]:
882+
) -> List[Tuple[str, str, str, int, str, bool, int]]:
849883
sql = """
850-
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
851-
ep.highlight, e.received_ts
884+
SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
885+
ep.actions, ep.highlight, e.received_ts
852886
FROM event_push_actions AS ep
853887
INNER JOIN events AS e USING (room_id, event_id)
854888
WHERE
@@ -859,7 +893,7 @@ def get_push_actions_txn(
859893
ORDER BY ep.stream_ordering DESC LIMIT ?
860894
"""
861895
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
862-
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
896+
return cast(List[Tuple[str, str, str, int, str, bool, int]], txn.fetchall())
863897

864898
push_actions = await self.db_pool.runInteraction(
865899
"get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
@@ -874,10 +908,10 @@ def get_push_actions_txn(
874908
actions=_deserialize_action(actions, highlight),
875909
received_ts=received_ts,
876910
)
877-
for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions
878-
# Only include push actions with a stream ordering after any receipt, or without any
879-
# receipt present (invited to but never read rooms).
880-
if stream_ordering > receipts_by_room.get(room_id, 0)
911+
for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions
912+
if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
913+
thread_id, stream_ordering
914+
)
881915
]
882916

883917
# Now sort it so it's ordered correctly, since currently it will

tests/storage/test_event_push_actions.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from twisted.test.proto_helpers import MemoryReactor
1818

19+
from synapse.api.constants import RelationTypes
1920
from synapse.rest import admin
2021
from synapse.rest.client import login, room
2122
from synapse.server import HomeServer
@@ -65,16 +66,23 @@ def test_get_unread_push_actions_for_user_in_range(self) -> None:
6566
user_id, token, _, other_token, room_id = self._create_users_and_room()
6667

6768
# Create two events, one of which is a highlight.
68-
self.helper.send_event(
69+
first_event_id = self.helper.send_event(
6970
room_id,
7071
type="m.room.message",
7172
content={"msgtype": "m.text", "body": "msg"},
7273
tok=other_token,
73-
)
74-
event_id = self.helper.send_event(
74+
)["event_id"]
75+
second_event_id = self.helper.send_event(
7576
room_id,
7677
type="m.room.message",
77-
content={"msgtype": "m.text", "body": user_id},
78+
content={
79+
"msgtype": "m.text",
80+
"body": user_id,
81+
"m.relates_to": {
82+
"rel_type": RelationTypes.THREAD,
83+
"event_id": first_event_id,
84+
},
85+
},
7886
tok=other_token,
7987
)["event_id"]
8088

@@ -94,13 +102,13 @@ def test_get_unread_push_actions_for_user_in_range(self) -> None:
94102
)
95103
self.assertEqual(2, len(email_actions))
96104

97-
# Send a receipt, which should clear any actions.
105+
# Send a receipt, which should clear the first action.
98106
self.get_success(
99107
self.store.insert_receipt(
100108
room_id,
101109
"m.read",
102110
user_id=user_id,
103-
event_ids=[event_id],
111+
event_ids=[first_event_id],
104112
thread_id=None,
105113
data={},
106114
)
@@ -110,6 +118,30 @@ def test_get_unread_push_actions_for_user_in_range(self) -> None:
110118
user_id, 0, 1000, 20
111119
)
112120
)
121+
self.assertEqual(1, len(http_actions))
122+
email_actions = self.get_success(
123+
self.store.get_unread_push_actions_for_user_in_range_for_email(
124+
user_id, 0, 1000, 20
125+
)
126+
)
127+
self.assertEqual(1, len(email_actions))
128+
129+
# Send a thread receipt to clear the thread action.
130+
self.get_success(
131+
self.store.insert_receipt(
132+
room_id,
133+
"m.read",
134+
user_id=user_id,
135+
event_ids=[second_event_id],
136+
thread_id=first_event_id,
137+
data={},
138+
)
139+
)
140+
http_actions = self.get_success(
141+
self.store.get_unread_push_actions_for_user_in_range_for_http(
142+
user_id, 0, 1000, 20
143+
)
144+
)
113145
self.assertEqual([], http_actions)
114146
email_actions = self.get_success(
115147
self.store.get_unread_push_actions_for_user_in_range_for_email(
@@ -416,17 +448,7 @@ def test_count_aggregation_mixed(self) -> None:
416448
sends both unthreaded and threaded receipts.
417449
"""
418450

419-
# Create a user to receive notifications and send receipts.
420-
user_id = self.register_user("user1235", "pass")
421-
token = self.login("user1235", "pass")
422-
423-
# And another users to send events.
424-
other_id = self.register_user("other", "pass")
425-
other_token = self.login("other", "pass")
426-
427-
# Create a room and put both users in it.
428-
room_id = self.helper.create_room_as(user_id, tok=token)
429-
self.helper.join(room_id, other_id, tok=other_token)
451+
user_id, token, _, other_token, room_id = self._create_users_and_room()
430452
thread_id: str
431453

432454
last_event_id: str

0 commit comments

Comments
 (0)