@@ -294,6 +294,44 @@ def __init__(
294
294
self ._background_backfill_thread_id ,
295
295
)
296
296
297
+ # Indexes which will be used to quickly make the thread_id column non-null.
298
+ self .db_pool .updates .register_background_index_update (
299
+ "event_push_actions_thread_id_null" ,
300
+ index_name = "event_push_actions_thread_id_null" ,
301
+ table = "event_push_actions" ,
302
+ columns = ["thread_id" ],
303
+ where_clause = "thread_id IS NULL" ,
304
+ )
305
+ self .db_pool .updates .register_background_index_update (
306
+ "event_push_summary_thread_id_null" ,
307
+ index_name = "event_push_summary_thread_id_null" ,
308
+ table = "event_push_summary" ,
309
+ columns = ["thread_id" ],
310
+ where_clause = "thread_id IS NULL" ,
311
+ )
312
+
313
+ # Check ASAP (and then later, every 1s) to see if we have finished
314
+ # background updates the event_push_actions and event_push_summary tables.
315
+ self ._clock .call_later (0.0 , self ._check_event_push_backfill_thread_id )
316
+ self ._event_push_backfill_thread_id_done = False
317
+
318
+ @wrap_as_background_process ("check_event_push_backfill_thread_id" )
319
+ async def _check_event_push_backfill_thread_id (self ) -> None :
320
+ """
321
+ Has thread_id finished backfilling?
322
+
323
+ If not, we need to just-in-time update it so the queries work.
324
+ """
325
+ done = await self .db_pool .updates .has_completed_background_update (
326
+ "event_push_backfill_thread_id"
327
+ )
328
+
329
+ if done :
330
+ self ._event_push_backfill_thread_id_done = True
331
+ else :
332
+ # Reschedule to run.
333
+ self ._clock .call_later (15.0 , self ._check_event_push_backfill_thread_id )
334
+
297
335
async def _background_backfill_thread_id (
298
336
self , progress : JsonDict , batch_size : int
299
337
) -> int :
@@ -526,6 +564,25 @@ def _get_thread(thread_id: str) -> NotifCounts:
526
564
(ReceiptTypes .READ , ReceiptTypes .READ_PRIVATE ),
527
565
)
528
566
567
+ # First ensure that the existing rows have an updated thread_id field.
568
+ if not self ._event_push_backfill_thread_id_done :
569
+ txn .execute (
570
+ """
571
+ UPDATE event_push_summary
572
+ SET thread_id = ?
573
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
574
+ """ ,
575
+ (MAIN_TIMELINE , room_id , user_id ),
576
+ )
577
+ txn .execute (
578
+ """
579
+ UPDATE event_push_actions
580
+ SET thread_id = ?
581
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
582
+ """ ,
583
+ (MAIN_TIMELINE , room_id , user_id ),
584
+ )
585
+
529
586
# First we pull the counts from the summary table.
530
587
#
531
588
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1341,6 +1398,25 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
1341
1398
(room_id , user_id , stream_ordering , * thread_args ),
1342
1399
)
1343
1400
1401
+ # First ensure that the existing rows have an updated thread_id field.
1402
+ if not self ._event_push_backfill_thread_id_done :
1403
+ txn .execute (
1404
+ """
1405
+ UPDATE event_push_summary
1406
+ SET thread_id = ?
1407
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
1408
+ """ ,
1409
+ (MAIN_TIMELINE , room_id , user_id ),
1410
+ )
1411
+ txn .execute (
1412
+ """
1413
+ UPDATE event_push_actions
1414
+ SET thread_id = ?
1415
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
1416
+ """ ,
1417
+ (MAIN_TIMELINE , room_id , user_id ),
1418
+ )
1419
+
1344
1420
# Fetch the notification counts between the stream ordering of the
1345
1421
# latest receipt and what was previously summarised.
1346
1422
unread_counts = self ._get_notif_unread_count_for_user_room (
@@ -1475,6 +1551,19 @@ def _rotate_notifs_before_txn(
1475
1551
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
1476
1552
"""
1477
1553
1554
+ # Ensure that any new actions have an updated thread_id.
1555
+ if not self ._event_push_backfill_thread_id_done :
1556
+ txn .execute (
1557
+ """
1558
+ UPDATE event_push_actions
1559
+ SET thread_id = ?
1560
+ WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
1561
+ """ ,
1562
+ (MAIN_TIMELINE , old_rotate_stream_ordering , rotate_to_stream_ordering ),
1563
+ )
1564
+
1565
+ # XXX Do we need to update summaries here too?
1566
+
1478
1567
# Calculate the new counts that should be upserted into event_push_summary
1479
1568
sql = """
1480
1569
SELECT user_id, room_id, thread_id,
@@ -1537,6 +1626,20 @@ def _rotate_notifs_before_txn(
1537
1626
1538
1627
logger .info ("Rotating notifications, handling %d rows" , len (summaries ))
1539
1628
1629
+ # Ensure that any updated threads have the proper thread_id.
1630
+ if not self ._event_push_backfill_thread_id_done :
1631
+ txn .execute_batch (
1632
+ """
1633
+ UPDATE event_push_summary
1634
+ SET thread_id = ?
1635
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
1636
+ """ ,
1637
+ [
1638
+ (MAIN_TIMELINE , room_id , user_id )
1639
+ for user_id , room_id , _ in summaries
1640
+ ],
1641
+ )
1642
+
1540
1643
self .db_pool .simple_upsert_many_txn (
1541
1644
txn ,
1542
1645
table = "event_push_summary" ,
0 commit comments