95
95
DatabasePool ,
96
96
LoggingDatabaseConnection ,
97
97
LoggingTransaction ,
98
+ PostgresEngine ,
98
99
)
99
100
from synapse .storage .databases .main .receipts import ReceiptsWorkerStore
100
101
from synapse .storage .databases .main .stream import StreamWorkerStore
@@ -427,8 +428,8 @@ def _get_unread_counts_by_pos_txn(
427
428
room_id: The room ID to get unread counts for.
428
429
user_id: The user ID to get unread counts for.
429
430
receipt_stream_ordering: The stream ordering of the user's latest
430
- receipt in the room. If there are no receipts, the stream ordering
431
- of the user's join event.
431
+ unthreaded receipt in the room. If there are no unthreaded receipts,
432
+ the stream ordering of the user's join event.
432
433
433
434
Returns:
434
435
A RoomNotifCounts object containing the notification count, the
@@ -444,6 +445,20 @@ def _get_thread(thread_id: str) -> NotifCounts:
444
445
return main_counts
445
446
return thread_counts .setdefault (thread_id , NotifCounts ())
446
447
448
+ receipt_types_clause , receipts_args = make_in_list_sql_clause (
449
+ self .database_engine ,
450
+ "receipt_type" ,
451
+ (ReceiptTypes .READ , ReceiptTypes .READ_PRIVATE ),
452
+ )
453
+
454
+ # PostgreSQL and SQLite differ in comparing scalar numerics.
455
+ if isinstance (self .database_engine , PostgresEngine ):
456
+ # GREATEST ignores NULLs.
457
+ receipt_stream_clause = "GREATEST(receipt_stream_ordering, ?)"
458
+ else :
459
+ # MAX returns NULL if any are NULL, so COALESCE to 0 first.
460
+ receipt_stream_clause = "MAX(COALESCE(receipt_stream_ordering, 0), ?)"
461
+
447
462
# First we pull the counts from the summary table.
448
463
#
449
464
# We check that `last_receipt_stream_ordering` matches the stream
@@ -458,57 +473,151 @@ def _get_thread(thread_id: str) -> NotifCounts:
458
473
# updated `event_push_summary` synchronously when persisting a new read
459
474
# receipt).
460
475
txn .execute (
461
- """
462
- SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
476
+ f """
477
+ SELECT notif_count, COALESCE(unread_count, 0), thread_id
463
478
FROM event_push_summary
479
+ LEFT JOIN (
480
+ SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
481
+ FROM receipts_linearized
482
+ LEFT JOIN events USING (room_id, event_id)
483
+ WHERE
484
+ user_id = ?
485
+ AND room_id = ?
486
+ AND { receipt_types_clause }
487
+ GROUP BY thread_id
488
+ ) AS receipts USING (thread_id)
464
489
WHERE room_id = ? AND user_id = ?
465
490
AND (
466
- (last_receipt_stream_ordering IS NULL AND stream_ordering > ? )
467
- OR last_receipt_stream_ordering = ?
491
+ (last_receipt_stream_ordering IS NULL AND stream_ordering > { receipt_stream_clause } )
492
+ OR last_receipt_stream_ordering = { receipt_stream_clause }
468
493
) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
469
494
""" ,
470
- (room_id , user_id , receipt_stream_ordering , receipt_stream_ordering ),
495
+ (
496
+ user_id ,
497
+ room_id ,
498
+ * receipts_args ,
499
+ room_id ,
500
+ user_id ,
501
+ receipt_stream_ordering ,
502
+ receipt_stream_ordering ,
503
+ ),
471
504
)
472
- max_summary_stream_ordering = 0
473
- for summary_stream_ordering , notif_count , unread_count , thread_id in txn :
505
+ summarised_threads = set ()
506
+ for notif_count , unread_count , thread_id in txn :
507
+ summarised_threads .add (thread_id )
474
508
counts = _get_thread (thread_id )
475
509
counts .notify_count += notif_count
476
510
counts .unread_count += unread_count
477
511
478
- # Summaries will only be used if they have not been invalidated by
479
- # a recent receipt; track the latest stream ordering or a valid summary.
480
- #
481
- # Note that since there's only one read receipt in the room per user,
482
- # valid summaries are contiguous.
483
- max_summary_stream_ordering = max (
484
- summary_stream_ordering , max_summary_stream_ordering
485
- )
486
-
487
512
# Next we need to count highlights, which aren't summarised
488
- sql = """
513
+ sql = f """
489
514
SELECT COUNT(*), thread_id FROM event_push_actions
515
+ LEFT JOIN (
516
+ SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
517
+ FROM receipts_linearized
518
+ LEFT JOIN events USING (room_id, event_id)
519
+ WHERE
520
+ user_id = ?
521
+ AND room_id = ?
522
+ AND { receipt_types_clause }
523
+ GROUP BY thread_id
524
+ ) AS receipts USING (thread_id)
490
525
WHERE user_id = ?
491
526
AND room_id = ?
492
- AND stream_ordering > ?
527
+ AND stream_ordering > { receipt_stream_clause }
493
528
AND highlight = 1
494
529
GROUP BY thread_id
495
530
"""
496
- txn .execute (sql , (user_id , room_id , receipt_stream_ordering ))
531
+ txn .execute (
532
+ sql ,
533
+ (
534
+ user_id ,
535
+ room_id ,
536
+ * receipts_args ,
537
+ user_id ,
538
+ room_id ,
539
+ receipt_stream_ordering ,
540
+ ),
541
+ )
497
542
for highlight_count , thread_id in txn :
498
543
_get_thread (thread_id ).highlight_count += highlight_count
499
544
545
+ # For threads which were summarised we need to count actions since the last
546
+ # rotation.
547
+ thread_id_clause , thread_id_args = make_in_list_sql_clause (
548
+ self .database_engine , "thread_id" , summarised_threads
549
+ )
550
+
551
+ # The (inclusive) event stream ordering that was previously summarised.
552
+ rotated_upto_stream_ordering = self .db_pool .simple_select_one_onecol_txn (
553
+ txn ,
554
+ table = "event_push_summary_stream_ordering" ,
555
+ keyvalues = {},
556
+ retcol = "stream_ordering" ,
557
+ )
558
+
559
+ unread_counts = self ._get_notif_unread_count_for_user_room (
560
+ txn , room_id , user_id , rotated_upto_stream_ordering
561
+ )
562
+ for notif_count , unread_count , thread_id in unread_counts :
563
+ if thread_id not in summarised_threads :
564
+ continue
565
+
566
+ if thread_id == "main" :
567
+ counts .notify_count += notif_count
568
+ counts .unread_count += unread_count
569
+ elif thread_id in thread_counts :
570
+ thread_counts [thread_id ].notify_count += notif_count
571
+ thread_counts [thread_id ].unread_count += unread_count
572
+ else :
573
+ # Previous thread summaries of 0 are discarded above.
574
+ #
575
+ # TODO If empty summaries are deleted this can be removed.
576
+ thread_counts [thread_id ] = NotifCounts (
577
+ notify_count = notif_count ,
578
+ unread_count = unread_count ,
579
+ highlight_count = 0 ,
580
+ )
581
+
500
582
# Finally we need to count push actions that aren't included in the
501
583
# summary returned above. This might be due to recent events that haven't
502
584
# been summarised yet or the summary is out of date due to a recent read
503
585
# receipt.
504
- start_unread_stream_ordering = max (
505
- receipt_stream_ordering , max_summary_stream_ordering
506
- )
507
- unread_counts = self ._get_notif_unread_count_for_user_room (
508
- txn , room_id , user_id , start_unread_stream_ordering
586
+ sql = f"""
587
+ SELECT
588
+ COUNT(CASE WHEN notif = 1 THEN 1 END),
589
+ COUNT(CASE WHEN unread = 1 THEN 1 END),
590
+ thread_id
591
+ FROM event_push_actions
592
+ LEFT JOIN (
593
+ SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
594
+ FROM receipts_linearized
595
+ LEFT JOIN events USING (room_id, event_id)
596
+ WHERE
597
+ user_id = ?
598
+ AND room_id = ?
599
+ AND { receipt_types_clause }
600
+ GROUP BY thread_id
601
+ ) AS receipts USING (thread_id)
602
+ WHERE user_id = ?
603
+ AND room_id = ?
604
+ AND stream_ordering > { receipt_stream_clause }
605
+ AND NOT { thread_id_clause }
606
+ GROUP BY thread_id
607
+ """
608
+ txn .execute (
609
+ sql ,
610
+ (
611
+ user_id ,
612
+ room_id ,
613
+ * receipts_args ,
614
+ user_id ,
615
+ room_id ,
616
+ receipt_stream_ordering ,
617
+ * thread_id_args ,
618
+ ),
509
619
)
510
-
511
- for notif_count , unread_count , thread_id in unread_counts :
620
+ for notif_count , unread_count , thread_id in txn :
512
621
counts = _get_thread (thread_id )
513
622
counts .notify_count += notif_count
514
623
counts .unread_count += unread_count
@@ -522,6 +631,7 @@ def _get_notif_unread_count_for_user_room(
522
631
user_id : str ,
523
632
stream_ordering : int ,
524
633
max_stream_ordering : Optional [int ] = None ,
634
+ thread_id : Optional [str ] = None ,
525
635
) -> List [Tuple [int , int , str ]]:
526
636
"""Returns the notify and unread counts from `event_push_actions` for
527
637
the given user/room in the given range.
@@ -547,17 +657,22 @@ def _get_notif_unread_count_for_user_room(
547
657
if not self ._events_stream_cache .has_entity_changed (room_id , stream_ordering ):
548
658
return []
549
659
550
- clause = ""
660
+ stream_ordering_clause = ""
551
661
args = [user_id , room_id , stream_ordering ]
552
662
if max_stream_ordering is not None :
553
- clause = "AND ea.stream_ordering <= ?"
663
+ stream_ordering_clause = "AND ea.stream_ordering <= ?"
554
664
args .append (max_stream_ordering )
555
665
556
666
# If the max stream ordering is less than the min stream ordering,
557
667
# then obviously there are zero push actions in that range.
558
668
if max_stream_ordering <= stream_ordering :
559
669
return []
560
670
671
+ thread_id_clause = ""
672
+ if thread_id is not None :
673
+ thread_id_clause = "AND thread_id = ?"
674
+ args .append (thread_id )
675
+
561
676
sql = f"""
562
677
SELECT
563
678
COUNT(CASE WHEN notif = 1 THEN 1 END),
@@ -567,7 +682,8 @@ def _get_notif_unread_count_for_user_room(
567
682
WHERE user_id = ?
568
683
AND room_id = ?
569
684
AND ea.stream_ordering > ?
570
- { clause }
685
+ { stream_ordering_clause }
686
+ { thread_id_clause }
571
687
GROUP BY thread_id
572
688
"""
573
689
@@ -1083,7 +1199,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
1083
1199
)
1084
1200
1085
1201
sql = """
1086
- SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
1202
+ SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
1087
1203
FROM receipts_linearized AS r
1088
1204
INNER JOIN events AS e USING (event_id)
1089
1205
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@@ -1106,13 +1222,18 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
1106
1222
)
1107
1223
rows = txn .fetchall ()
1108
1224
1109
- # For each new read receipt we delete push actions from before it and
1110
- # recalculate the summary .
1111
- for _ , room_id , user_id , stream_ordering in rows :
1225
+ # First handle all the rows without a thread ID (i.e. ones that apply to
1226
+ # the entire room) .
1227
+ for _ , room_id , user_id , thread_id , stream_ordering in rows :
1112
1228
# Only handle our own read receipts.
1113
1229
if not self .hs .is_mine_id (user_id ):
1114
1230
continue
1115
1231
1232
+ if thread_id is not None :
1233
+ continue
1234
+
1235
+ # For each new read receipt we delete push actions from before it and
1236
+ # recalculate the summary.
1116
1237
txn .execute (
1117
1238
"""
1118
1239
DELETE FROM event_push_actions
@@ -1154,6 +1275,64 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
1154
1275
value_values = [(row [0 ], row [1 ]) for row in unread_counts ],
1155
1276
)
1156
1277
1278
+ # For each new read receipt we delete push actions from before it and
1279
+ # recalculate the summary.
1280
+ for _ , room_id , user_id , thread_id , stream_ordering in rows :
1281
+ # Only handle our own read receipts.
1282
+ if not self .hs .is_mine_id (user_id ):
1283
+ continue
1284
+
1285
+ if thread_id is None :
1286
+ continue
1287
+
1288
+ # For each new read receipt we delete push actions from before it and
1289
+ # recalculate the summary.
1290
+ txn .execute (
1291
+ """
1292
+ DELETE FROM event_push_actions
1293
+ WHERE room_id = ?
1294
+ AND user_id = ?
1295
+ AND thread_id = ?
1296
+ AND stream_ordering <= ?
1297
+ AND highlight = 0
1298
+ """ ,
1299
+ (room_id , user_id , thread_id , stream_ordering ),
1300
+ )
1301
+
1302
+ # Fetch the notification counts between the stream ordering of the
1303
+ # latest receipt and what was previously summarised.
1304
+ unread_counts = self ._get_notif_unread_count_for_user_room (
1305
+ txn ,
1306
+ room_id ,
1307
+ user_id ,
1308
+ stream_ordering ,
1309
+ old_rotate_stream_ordering ,
1310
+ thread_id ,
1311
+ )
1312
+ # unread_counts will be a list of 0 or 1 items.
1313
+ if unread_counts :
1314
+ notif_count , unread_count , _ = unread_counts [0 ]
1315
+ else :
1316
+ notif_count = 0
1317
+ unread_count = 0
1318
+
1319
+ # Update the summary of this specific thread.
1320
+ self .db_pool .simple_upsert_txn (
1321
+ txn ,
1322
+ table = "event_push_summary" ,
1323
+ keyvalues = {
1324
+ "room_id" : room_id ,
1325
+ "user_id" : user_id ,
1326
+ "thread_id" : thread_id ,
1327
+ },
1328
+ values = {
1329
+ "notif_count" : notif_count ,
1330
+ "unread_count" : unread_count ,
1331
+ "stream_ordering" : old_rotate_stream_ordering ,
1332
+ "last_receipt_stream_ordering" : stream_ordering ,
1333
+ },
1334
+ )
1335
+
1157
1336
# We always update `event_push_summary_last_receipt_stream_id` to
1158
1337
# ensure that we don't rescan the same receipts for remote users.
1159
1338
0 commit comments