Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Clear out old rows from event_push_actions_staging #14020

Merged
merged 8 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14020.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clear out stale entries in `event_push_actions_staging` table.
58 changes: 57 additions & 1 deletion synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def __init__(
):
super().__init__(database, db_conn, hs)

# Track when the process started.
self._started_ts = self._clock.time_msec()

# These get correctly set by _find_stream_orderings_for_times_txn
self.stream_ordering_month_ago: Optional[int] = None
self.stream_ordering_day_ago: Optional[int] = None
Expand All @@ -224,6 +227,10 @@ def __init__(
self._rotate_notifs, 30 * 1000
)

self._clear_old_staging_loop = self._clock.looping_call(
self._clear_old_push_actions_staging, 30 * 60 * 1000
)

self.db_pool.updates.register_background_index_update(
"event_push_summary_unique_index",
index_name="event_push_summary_unique_index",
Expand Down Expand Up @@ -791,7 +798,7 @@ async def add_push_actions_to_staging(
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(
user_id: str, actions: Collection[Union[Mapping, str]]
) -> Tuple[str, str, str, int, int, int, str]:
) -> Tuple[str, str, str, int, int, int, str, int]:
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
return (
Expand All @@ -802,6 +809,7 @@ def _gen_entry(
is_highlight, # highlight column
int(count_as_unread), # unread column
thread_id, # thread_id column
self._clock.time_msec(), # inserted_ts column
)

await self.db_pool.simple_insert_many(
Expand All @@ -814,6 +822,7 @@ def _gen_entry(
"highlight",
"unread",
"thread_id",
"inserted_ts",
),
values=[
_gen_entry(user_id, actions)
Expand Down Expand Up @@ -1340,6 +1349,53 @@ def remove_old_push_actions_that_have_rotated_txn(
if done:
break

@wrap_as_background_process("_clear_old_push_actions_staging")
async def _clear_old_push_actions_staging(self) -> None:
"""Clear out any old event push actions from the staging table for
events that we failed to persist.
"""

# We delete anything more than an hour old, on the assumption that we'll
# never take more than an hour to persist an event.
delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000
Comment on lines +1358 to +1360
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is building in an assumption that the background worker and event persister workers are restarted within 1 hour of each other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no, since we add a default to inserted_ts for all new rows, so even if we don't restart the event persister we should have the right values in the table.

(I think, i spent quite a while trying to figure out how to make this work)


if self._started_ts > self._clock.time_msec() - delete_before_ts:
# We need to wait for at least an hour before we started deleting,
# so that we know it's safe to delete rows with NULL `inserted_ts`.
return

# We don't have an index on `inserted_ts`, instead we assume that the
# number of "live" rows in `event_push_actions_staging` is small enough
# that an infrequent periodic scan won't cause a problem.
#
# Note: we also delete any columns with NULL `inserted_ts`, this is safe
# as we added a default value to new rows and so they must be at least
# an hour old.
limit = 1000
sql = """
DELETE FROM event_push_actions_staging WHERE event_id IN (
SELECT event_id FROM event_push_actions_staging WHERE
inserted_ts < ? OR inserted_ts IS NULL
LIMIT ?
)
"""

def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool:
txn.execute(sql, (delete_before_ts, limit))
return txn.rowcount >= limit

while True:
# Returns true if we have more stuff to delete from the table.
deleted = await self.db_pool.runInteraction(
"_clear_old_push_actions_staging", _clear_old_push_actions_staging_txn
)

if not deleted:
return

# We sleep to ensure that we don't overwhelm the DB.
await self._clock.sleep(1.0)


class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
events over federation.
- Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`,
`batch_events`) to make it easy to delete all associated rows when purging a room.
- `inserted_ts` column is added to `event_push_actions_staging` table.
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a column so that we know when a push action was inserted, to make it
-- easier to clear out old ones.
ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT;

-- We now add a default for *new* rows. We don't do this above as we don't want
-- to have to update every remove with the new default.
ALTER TABLE event_push_actions_staging ALTER COLUMN inserted_ts SET DEFAULT extract(epoch from now()) * 1000;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- On SQLite we must be in monolith mode and updating the database from Synapse,
-- so its safe to assume that `event_push_actions_staging` should be empty (as
-- over restart an event must either have been fully persisted or we'll
-- recalculate the push actions)
DELETE FROM event_push_actions_staging;

-- Add a column so that we know when a push action was inserted, to make it
-- easier to clear out old ones.
ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT;