diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cf1899b580e8..d732826ca66e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -45,16 +45,6 @@ jobs: - run: poetry run scripts-dev/generate_sample_config.sh --check - run: poetry run scripts-dev/config-lint.sh - check-schema-delta: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 - with: - python-version: "3.x" - - run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'" - - run: scripts-dev/check_schema_delta.py --force-colors - check-lockfile: runs-on: ubuntu-latest steps: @@ -221,7 +211,6 @@ jobs: - lint-newsfile - lint-pydantic - check-sampleconfig - - check-schema-delta - check-lockfile - lint-clippy - lint-rustfmt @@ -609,6 +598,16 @@ jobs: - run: cargo bench --no-run + check-schema-delta: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.x" + - run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'" + - run: scripts-dev/check_schema_delta.py --force-colors + # a job which marks all the other jobs as complete, thus allowing PRs to be merged. tests-done: if: ${{ always() }} diff --git a/changelog.d/15724.bugfix b/changelog.d/15724.bugfix new file mode 100644 index 000000000000..8ffd3957f871 --- /dev/null +++ b/changelog.d/15724.bugfix @@ -0,0 +1 @@ +Fix missing dependencies in background jobs. diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 5b8ba436d48c..b07a00624b54 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -22,7 +22,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import MutableStateMap, StateMap from synapse.types.state import StateFilter from synapse.util.caches import intern_string @@ -328,6 +328,15 @@ def __init__( columns=["event_stream_ordering"], ) + self.db_pool.updates.register_background_update_handler( + "add_event_stream_ordering", + self._add_event_stream_ordering, + ) + + self.db_pool.updates.register_background_update_handler( + "add_stream_ordering_triggers", self._add_triggers_in_bg + ) + async def _background_deduplicate_state( self, progress: dict, batch_size: int ) -> int: @@ -504,3 +513,175 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None: ) return 1 + + async def _add_event_stream_ordering(self, progress: dict, batch_size: int) -> int: + """ + Add denormalised copies of `stream_ordering` from the corresponding row in `events` + to the tables current_state_events, local_current_membership, and room_memberships. + This is done to improve database performance by reduring JOINs. + + """ + tables = [ + "current_state_events", + "local_current_membership", + "room_memberships", + ] + + if isinstance(self.database_engine, PostgresEngine): + + def check_pg_column(txn: LoggingTransaction, table: str) -> list: + """ + check if the column event_stream_ordering already exists + """ + check_sql = f""" + SELECT column_name FROM information_schema.columns + WHERE table_name = '{table}' and column_name = 'event_stream_ordering'; + """ + txn.execute(check_sql) + column = txn.fetchall() + return column + + def add_pg_column(txn: LoggingTransaction, table: str) -> None: + """ + Add column event_stream_ordering to A given table + """ + add_column_sql = f""" + ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT; + """ + txn.execute(add_column_sql) + + add_fk_sql = f""" + ALTER TABLE {table} ADD CONSTRAINT event_stream_ordering_fkey + FOREIGN KEY(event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; + """ + txn.execute(add_fk_sql) + + for table in tables: + res = await self.db_pool.runInteraction( + "check_column", check_pg_column, table + ) + # if the column exists do nothing + if not res: + await self.db_pool.runInteraction( + "add_event_stream_ordering", + add_pg_column, + table, + ) + await self.db_pool.updates._end_background_update( + "add_event_stream_ordering" + ) + return 1 + + elif isinstance(self.database_engine, Sqlite3Engine): + + def check_sqlite_column(txn: LoggingTransaction, table: str) -> List[tuple]: + """ + Get table info (to see if column event_stream_ordering exists) + """ + check_sql = f""" + PRAGMA table_info({table}) + """ + txn.execute(check_sql) + res = txn.fetchall() + return res + + def add_sqlite_column(txn: LoggingTransaction, table: str) -> None: + """ + Add column event_stream_ordering to given table + """ + add_column_sql = f""" + ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); + """ + txn.execute(add_column_sql) + + for table in tables: + res = await self.db_pool.runInteraction( + "check_column", check_sqlite_column, table + ) + columns = [tup[1] for tup in res] + + # if the column exists do nothing + if "event_stream_ordering" not in columns: + await self.db_pool.runInteraction( + "add_event_stream_ordering", add_sqlite_column, table + ) + + await self.db_pool.updates._end_background_update( + "add_event_stream_ordering" + ) + return 1 + + async def _add_triggers_in_bg(self, progress: dict, batch_size: int) -> int: + """ + Adds triggers to the room membership tables to enforce consistency + """ + # Complain if the `event_stream_ordering` in membership tables doesn't match + # the `stream_ordering` row with the same `event_id` in `events`. + if isinstance(self.database_engine, Sqlite3Engine): + + def add_sqlite_triggers(txn: LoggingTransaction) -> None: + for table in ( + "current_state_events", + "local_current_membership", + "room_memberships", + ): + txn.execute( + f""" + CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering + BEFORE INSERT ON {table} + FOR EACH ROW + BEGIN + SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}') + WHERE EXISTS ( + SELECT 1 FROM events + WHERE events.event_id = NEW.event_id + AND events.stream_ordering != NEW.event_stream_ordering + ); + END; + """ + ) + + await self.db_pool.runInteraction( + "add_sqlite_triggers", add_sqlite_triggers + ) + elif isinstance(self.database_engine, PostgresEngine): + + def add_pg_triggers(txn: LoggingTransaction) -> None: + txn.execute( + """ + CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$ + BEGIN + IF EXISTS ( + SELECT 1 FROM events + WHERE events.event_id = NEW.event_id + AND events.stream_ordering != NEW.event_stream_ordering + ) THEN + RAISE EXCEPTION 'Incorrect event_stream_ordering'; + END IF; + RETURN NEW; + END; + $BODY$ LANGUAGE plpgsql; + """ + ) + + for table in ( + "current_state_events", + "local_current_membership", + "room_memberships", + ): + txn.execute( + f""" + CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table} + FOR EACH ROW + EXECUTE PROCEDURE check_event_stream_ordering() + """ + ) + + await self.db_pool.runInteraction("add_postgres_triggers", add_pg_triggers) + else: + raise NotImplementedError("Unknown database engine") + + await self.db_pool.updates._end_background_update( + "add_stream_ordering_triggers" + ) + return 1 diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql new file mode 100644 index 000000000000..991d0f6f8b94 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql @@ -0,0 +1,18 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) + VALUES + (7403, 'add_event_stream_ordering', '{}', 'replace_stream_ordering_column'); diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres deleted file mode 100644 index ceb750a9fa51..000000000000 --- a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres +++ /dev/null @@ -1,29 +0,0 @@ -/* Copyright 2022 Beeper - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which --- we use to improve database performance by reduring JOINs. - --- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables, --- which will be validated in a later migration. For all new/updated rows the FKEY will be checked. - -ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; - -ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; - -ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite deleted file mode 100644 index 6f6283fdb769..000000000000 --- a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite +++ /dev/null @@ -1,23 +0,0 @@ -/* Copyright 2022 Beeper - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which --- we use to improve database performance by reduring JOINs. - --- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is - -ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); -ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); -ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py deleted file mode 100644 index 2ee2bc9422a6..000000000000 --- a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2022 Beeper -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" -This migration adds triggers to the room membership tables to enforce consistency. -Triggers cannot be expressed in .sql files, so we have to use a separate file. -""" -from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine - - -def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - # Complain if the `event_stream_ordering` in membership tables doesn't match - # the `stream_ordering` row with the same `event_id` in `events`. - if isinstance(database_engine, Sqlite3Engine): - for table in ( - "current_state_events", - "local_current_membership", - "room_memberships", - ): - cur.execute( - f""" - CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering - BEFORE INSERT ON {table} - FOR EACH ROW - BEGIN - SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}') - WHERE EXISTS ( - SELECT 1 FROM events - WHERE events.event_id = NEW.event_id - AND events.stream_ordering != NEW.event_stream_ordering - ); - END; - """ - ) - elif isinstance(database_engine, PostgresEngine): - cur.execute( - """ - CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$ - BEGIN - IF EXISTS ( - SELECT 1 FROM events - WHERE events.event_id = NEW.event_id - AND events.stream_ordering != NEW.event_stream_ordering - ) THEN - RAISE EXCEPTION 'Incorrect event_stream_ordering'; - END IF; - RETURN NEW; - END; - $BODY$ LANGUAGE plpgsql; - """ - ) - - for table in ( - "current_state_events", - "local_current_membership", - "room_memberships", - ): - cur.execute( - f""" - CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table} - FOR EACH ROW - EXECUTE PROCEDURE check_event_stream_ordering() - """ - ) - else: - raise NotImplementedError("Unknown database engine") diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql new file mode 100644 index 000000000000..27dd44d31e41 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql @@ -0,0 +1,22 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +-- This migration adds triggers to the room membership tables to enforce consistency. + +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) + VALUES + (7404, 'add_stream_ordering_triggers', '{}', 'add_event_stream_ordering'); \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/77/14bg_indices_event_stream_ordering.sql b/synapse/storage/schema/main/delta/77/14bg_indices_event_stream_ordering.sql index ec8cd522ecfc..9494a712517a 100644 --- a/synapse/storage/schema/main/delta/77/14bg_indices_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/77/14bg_indices_event_stream_ordering.sql @@ -13,8 +13,8 @@ * limitations under the License. */ -INSERT INTO background_updates (ordering, update_name, progress_json) +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (7714, 'current_state_events_stream_ordering_idx', '{}'), - (7714, 'local_current_membership_stream_ordering_idx', '{}'), - (7714, 'room_memberships_stream_ordering_idx', '{}'); + (7714, 'current_state_events_stream_ordering_idx', '{}', 'add_event_stream_ordering'), + (7714, 'local_current_membership_stream_ordering_idx', '{}', 'add_event_stream_ordering'), + (7714, 'room_memberships_stream_ordering_idx', '{}', 'add_event_stream_ordering');