Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 77c6dc7

Browse files
committed
Mark threads as read separately.
1 parent b844025 commit 77c6dc7

File tree

4 files changed

+405
-38
lines changed

4 files changed

+405
-38
lines changed

changelog.d/13877.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).

synapse/storage/databases/main/event_push_actions.py

Lines changed: 214 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
DatabasePool,
9696
LoggingDatabaseConnection,
9797
LoggingTransaction,
98+
PostgresEngine,
9899
)
99100
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
100101
from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -427,8 +428,8 @@ def _get_unread_counts_by_pos_txn(
427428
room_id: The room ID to get unread counts for.
428429
user_id: The user ID to get unread counts for.
429430
receipt_stream_ordering: The stream ordering of the user's latest
430-
receipt in the room. If there are no receipts, the stream ordering
431-
of the user's join event.
431+
unthreaded receipt in the room. If there are no unthreaded receipts,
432+
the stream ordering of the user's join event.
432433
433434
Returns:
434435
A RoomNotifCounts object containing the notification count, the
@@ -444,6 +445,20 @@ def _get_thread(thread_id: str) -> NotifCounts:
444445
return main_counts
445446
return thread_counts.setdefault(thread_id, NotifCounts())
446447

448+
receipt_types_clause, receipts_args = make_in_list_sql_clause(
449+
self.database_engine,
450+
"receipt_type",
451+
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
452+
)
453+
454+
# PostgreSQL and SQLite differ in comparing scalar numerics.
455+
if isinstance(self.database_engine, PostgresEngine):
456+
# GREATEST ignores NULLs.
457+
receipt_stream_clause = "GREATEST(receipt_stream_ordering, ?)"
458+
else:
459+
# MAX returns NULL if any are NULL, so COALESCE to 0 first.
460+
receipt_stream_clause = "MAX(COALESCE(receipt_stream_ordering, 0), ?)"
461+
447462
# First we pull the counts from the summary table.
448463
#
449464
# We check that `last_receipt_stream_ordering` matches the stream
@@ -458,57 +473,151 @@ def _get_thread(thread_id: str) -> NotifCounts:
458473
# updated `event_push_summary` synchronously when persisting a new read
459474
# receipt).
460475
txn.execute(
461-
"""
462-
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
476+
f"""
477+
SELECT notif_count, COALESCE(unread_count, 0), thread_id
463478
FROM event_push_summary
479+
LEFT JOIN (
480+
SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
481+
FROM receipts_linearized
482+
LEFT JOIN events USING (room_id, event_id)
483+
WHERE
484+
user_id = ?
485+
AND room_id = ?
486+
AND {receipt_types_clause}
487+
GROUP BY thread_id
488+
) AS receipts USING (thread_id)
464489
WHERE room_id = ? AND user_id = ?
465490
AND (
466-
(last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
467-
OR last_receipt_stream_ordering = ?
491+
(last_receipt_stream_ordering IS NULL AND stream_ordering > {receipt_stream_clause})
492+
OR last_receipt_stream_ordering = {receipt_stream_clause}
468493
) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
469494
""",
470-
(room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
495+
(
496+
user_id,
497+
room_id,
498+
*receipts_args,
499+
room_id,
500+
user_id,
501+
receipt_stream_ordering,
502+
receipt_stream_ordering,
503+
),
471504
)
472-
max_summary_stream_ordering = 0
473-
for summary_stream_ordering, notif_count, unread_count, thread_id in txn:
505+
summarised_threads = set()
506+
for notif_count, unread_count, thread_id in txn:
507+
summarised_threads.add(thread_id)
474508
counts = _get_thread(thread_id)
475509
counts.notify_count += notif_count
476510
counts.unread_count += unread_count
477511

478-
# Summaries will only be used if they have not been invalidated by
479-
# a recent receipt; track the latest stream ordering or a valid summary.
480-
#
481-
# Note that since there's only one read receipt in the room per user,
482-
# valid summaries are contiguous.
483-
max_summary_stream_ordering = max(
484-
summary_stream_ordering, max_summary_stream_ordering
485-
)
486-
487512
# Next we need to count highlights, which aren't summarised
488-
sql = """
513+
sql = f"""
489514
SELECT COUNT(*), thread_id FROM event_push_actions
515+
LEFT JOIN (
516+
SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
517+
FROM receipts_linearized
518+
LEFT JOIN events USING (room_id, event_id)
519+
WHERE
520+
user_id = ?
521+
AND room_id = ?
522+
AND {receipt_types_clause}
523+
GROUP BY thread_id
524+
) AS receipts USING (thread_id)
490525
WHERE user_id = ?
491526
AND room_id = ?
492-
AND stream_ordering > ?
527+
AND stream_ordering > {receipt_stream_clause}
493528
AND highlight = 1
494529
GROUP BY thread_id
495530
"""
496-
txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
531+
txn.execute(
532+
sql,
533+
(
534+
user_id,
535+
room_id,
536+
*receipts_args,
537+
user_id,
538+
room_id,
539+
receipt_stream_ordering,
540+
),
541+
)
497542
for highlight_count, thread_id in txn:
498543
_get_thread(thread_id).highlight_count += highlight_count
499544

545+
# For threads which were summarised we need to count actions since the last
546+
# rotation.
547+
thread_id_clause, thread_id_args = make_in_list_sql_clause(
548+
self.database_engine, "thread_id", summarised_threads
549+
)
550+
551+
# The (inclusive) event stream ordering that was previously summarised.
552+
rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
553+
txn,
554+
table="event_push_summary_stream_ordering",
555+
keyvalues={},
556+
retcol="stream_ordering",
557+
)
558+
559+
unread_counts = self._get_notif_unread_count_for_user_room(
560+
txn, room_id, user_id, rotated_upto_stream_ordering
561+
)
562+
for notif_count, unread_count, thread_id in unread_counts:
563+
if thread_id not in summarised_threads:
564+
continue
565+
566+
if thread_id == "main":
567+
counts.notify_count += notif_count
568+
counts.unread_count += unread_count
569+
elif thread_id in thread_counts:
570+
thread_counts[thread_id].notify_count += notif_count
571+
thread_counts[thread_id].unread_count += unread_count
572+
else:
573+
# Previous thread summaries of 0 are discarded above.
574+
#
575+
# TODO If empty summaries are deleted this can be removed.
576+
thread_counts[thread_id] = NotifCounts(
577+
notify_count=notif_count,
578+
unread_count=unread_count,
579+
highlight_count=0,
580+
)
581+
500582
# Finally we need to count push actions that aren't included in the
501583
# summary returned above. This might be due to recent events that haven't
502584
# been summarised yet or the summary is out of date due to a recent read
503585
# receipt.
504-
start_unread_stream_ordering = max(
505-
receipt_stream_ordering, max_summary_stream_ordering
506-
)
507-
unread_counts = self._get_notif_unread_count_for_user_room(
508-
txn, room_id, user_id, start_unread_stream_ordering
586+
sql = f"""
587+
SELECT
588+
COUNT(CASE WHEN notif = 1 THEN 1 END),
589+
COUNT(CASE WHEN unread = 1 THEN 1 END),
590+
thread_id
591+
FROM event_push_actions
592+
LEFT JOIN (
593+
SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
594+
FROM receipts_linearized
595+
LEFT JOIN events USING (room_id, event_id)
596+
WHERE
597+
user_id = ?
598+
AND room_id = ?
599+
AND {receipt_types_clause}
600+
GROUP BY thread_id
601+
) AS receipts USING (thread_id)
602+
WHERE user_id = ?
603+
AND room_id = ?
604+
AND stream_ordering > {receipt_stream_clause}
605+
AND NOT {thread_id_clause}
606+
GROUP BY thread_id
607+
"""
608+
txn.execute(
609+
sql,
610+
(
611+
user_id,
612+
room_id,
613+
*receipts_args,
614+
user_id,
615+
room_id,
616+
receipt_stream_ordering,
617+
*thread_id_args,
618+
),
509619
)
510-
511-
for notif_count, unread_count, thread_id in unread_counts:
620+
for notif_count, unread_count, thread_id in txn:
512621
counts = _get_thread(thread_id)
513622
counts.notify_count += notif_count
514623
counts.unread_count += unread_count
@@ -522,6 +631,7 @@ def _get_notif_unread_count_for_user_room(
522631
user_id: str,
523632
stream_ordering: int,
524633
max_stream_ordering: Optional[int] = None,
634+
thread_id: Optional[str] = None,
525635
) -> List[Tuple[int, int, str]]:
526636
"""Returns the notify and unread counts from `event_push_actions` for
527637
the given user/room in the given range.
@@ -547,17 +657,22 @@ def _get_notif_unread_count_for_user_room(
547657
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
548658
return []
549659

550-
clause = ""
660+
stream_ordering_clause = ""
551661
args = [user_id, room_id, stream_ordering]
552662
if max_stream_ordering is not None:
553-
clause = "AND ea.stream_ordering <= ?"
663+
stream_ordering_clause = "AND ea.stream_ordering <= ?"
554664
args.append(max_stream_ordering)
555665

556666
# If the max stream ordering is less than the min stream ordering,
557667
# then obviously there are zero push actions in that range.
558668
if max_stream_ordering <= stream_ordering:
559669
return []
560670

671+
thread_id_clause = ""
672+
if thread_id is not None:
673+
thread_id_clause = "AND thread_id = ?"
674+
args.append(thread_id)
675+
561676
sql = f"""
562677
SELECT
563678
COUNT(CASE WHEN notif = 1 THEN 1 END),
@@ -567,7 +682,8 @@ def _get_notif_unread_count_for_user_room(
567682
WHERE user_id = ?
568683
AND room_id = ?
569684
AND ea.stream_ordering > ?
570-
{clause}
685+
{stream_ordering_clause}
686+
{thread_id_clause}
571687
GROUP BY thread_id
572688
"""
573689

@@ -1083,7 +1199,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
10831199
)
10841200

10851201
sql = """
1086-
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
1202+
SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
10871203
FROM receipts_linearized AS r
10881204
INNER JOIN events AS e USING (event_id)
10891205
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@@ -1106,13 +1222,18 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
11061222
)
11071223
rows = txn.fetchall()
11081224

1109-
# For each new read receipt we delete push actions from before it and
1110-
# recalculate the summary.
1111-
for _, room_id, user_id, stream_ordering in rows:
1225+
# First handle all the rows without a thread ID (i.e. ones that apply to
1226+
# the entire room).
1227+
for _, room_id, user_id, thread_id, stream_ordering in rows:
11121228
# Only handle our own read receipts.
11131229
if not self.hs.is_mine_id(user_id):
11141230
continue
11151231

1232+
if thread_id is not None:
1233+
continue
1234+
1235+
# For each new read receipt we delete push actions from before it and
1236+
# recalculate the summary.
11161237
txn.execute(
11171238
"""
11181239
DELETE FROM event_push_actions
@@ -1154,6 +1275,64 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
11541275
value_values=[(row[0], row[1]) for row in unread_counts],
11551276
)
11561277

1278+
# For each new read receipt we delete push actions from before it and
1279+
# recalculate the summary.
1280+
for _, room_id, user_id, thread_id, stream_ordering in rows:
1281+
# Only handle our own read receipts.
1282+
if not self.hs.is_mine_id(user_id):
1283+
continue
1284+
1285+
if thread_id is None:
1286+
continue
1287+
1288+
# For each new read receipt we delete push actions from before it and
1289+
# recalculate the summary.
1290+
txn.execute(
1291+
"""
1292+
DELETE FROM event_push_actions
1293+
WHERE room_id = ?
1294+
AND user_id = ?
1295+
AND thread_id = ?
1296+
AND stream_ordering <= ?
1297+
AND highlight = 0
1298+
""",
1299+
(room_id, user_id, thread_id, stream_ordering),
1300+
)
1301+
1302+
# Fetch the notification counts between the stream ordering of the
1303+
# latest receipt and what was previously summarised.
1304+
unread_counts = self._get_notif_unread_count_for_user_room(
1305+
txn,
1306+
room_id,
1307+
user_id,
1308+
stream_ordering,
1309+
old_rotate_stream_ordering,
1310+
thread_id,
1311+
)
1312+
# unread_counts will be a list of 0 or 1 items.
1313+
if unread_counts:
1314+
notif_count, unread_count, _ = unread_counts[0]
1315+
else:
1316+
notif_count = 0
1317+
unread_count = 0
1318+
1319+
# Update the summary of this specific thread.
1320+
self.db_pool.simple_upsert_txn(
1321+
txn,
1322+
table="event_push_summary",
1323+
keyvalues={
1324+
"room_id": room_id,
1325+
"user_id": user_id,
1326+
"thread_id": thread_id,
1327+
},
1328+
values={
1329+
"notif_count": notif_count,
1330+
"unread_count": unread_count,
1331+
"stream_ordering": old_rotate_stream_ordering,
1332+
"last_receipt_stream_ordering": stream_ordering,
1333+
},
1334+
)
1335+
11571336
# We always update `event_push_summary_last_receipt_stream_id` to
11581337
# ensure that we don't rescan the same receipts for remote users.
11591338

synapse/storage/databases/main/receipts.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ def get_last_receipt_for_user_txn(
170170
receipt_types: Collection[str],
171171
) -> Optional[Tuple[str, int]]:
172172
"""
173-
Fetch the event ID and stream_ordering for the latest receipt in a room
174-
with one of the given receipt types.
173+
Fetch the event ID and stream_ordering for the latest unthreaded receipt
174+
in a room with one of the given receipt types.
175175
176176
Args:
177177
user_id: The user to fetch receipts for.
@@ -193,6 +193,7 @@ def get_last_receipt_for_user_txn(
193193
WHERE {clause}
194194
AND user_id = ?
195195
AND room_id = ?
196+
AND thread_id IS NULL
196197
ORDER BY stream_ordering DESC
197198
LIMIT 1
198199
"""

0 commit comments

Comments
 (0)