-
Notifications
You must be signed in to change notification settings - Fork 81
ENG-663: add StagedResourceAncestors
table and remove child_diff_statuses
#6185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
0c4278b
remove child diff status; add ancestor linking table
adamsachs 865e079
add test comments
adamsachs 6b3d1ff
fix db_dataset.yml
adamsachs 690d30a
Merge branch 'main' into asachs/cds-refactor-fides
adamsachs 66470d0
update migration
adamsachs e8fbacb
breakup migrations, first shot at ancestor data migration
adamsachs 6310ef3
optimize data migration. indexes and constraints post-migration
adamsachs bf68826
Merge branch 'main' into asachs/cds-refactor-fides
adamsachs 2d21962
change downrev
adamsachs 66ac9ee
add changelog
adamsachs 790d8f1
Merge branch 'main' into asachs/cds-refactor-fides
adamsachs 424b367
improve docstring on data migration
adamsachs a4a1e5c
run post upgrade index creation script on startup as a non-blocking task
adamsachs a77b905
add missing type annotation
adamsachs 3bcd945
create generic locking utility; lock index creation to prevent concur…
adamsachs 5a7a9d9
switch request service to use locking utility
adamsachs 49cd729
Merge branch 'main' into asachs/cds-refactor-fides
adamsachs 4ebe9f5
migration improvements, tweak on lock release
adamsachs c02c823
Merge branch 'main' into asachs/cds-refactor-fides
adamsachs 16a9ce8
Merge branch 'main' into asachs/cds-refactor-fides
adamsachs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
71 changes: 71 additions & 0 deletions
71
...pi/alembic/migrations/versions/5474a47c77da_create_staged_resource_ancestor_link_table.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
""" | ||
Creates staged resource ancestor link table. | ||
|
||
Does not populate the table with data, that is handled in a separate data migration. | ||
|
||
Also does not drop the StagedResource.child_diff_statuses column, | ||
that is handled in a separate migration, _after_ the data migration | ||
that populates the staged resource ancestor link table, to prevent | ||
unintended data loss. | ||
|
||
Revision ID: 5474a47c77da | ||
Revises: 440a5b9a3493 | ||
Create Date: 2025-05-27 19:02:43.802783 | ||
|
||
""" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from sqlalchemy.dialects import postgresql | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "5474a47c77da" | ||
down_revision = "440a5b9a3493" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.create_table( | ||
"stagedresourceancestor", | ||
sa.Column("id", sa.String(length=255), nullable=False), | ||
sa.Column( | ||
"created_at", | ||
sa.DateTime(timezone=True), | ||
server_default=sa.text("now()"), | ||
nullable=True, | ||
), | ||
sa.Column( | ||
"updated_at", | ||
sa.DateTime(timezone=True), | ||
server_default=sa.text("now()"), | ||
nullable=True, | ||
), | ||
sa.Column("ancestor_urn", sa.String(), nullable=False), | ||
sa.Column("descendant_urn", sa.String(), nullable=False), | ||
# primary key, foreign key and unique constraints are deferred | ||
# until after the data migration that populates the table, | ||
# to avoid slowing down the data migration | ||
) | ||
|
||
# defer creating `stagedresourceancestor` indexes until after the data migration | ||
# that populates the table, to avoid slowing down the data migration | ||
|
||
# add an index for StagedResource.diff_status to improve queries against the diff_status field | ||
op.create_index( | ||
op.f("ix_stagedresource_diff_status"), | ||
"stagedresource", | ||
["diff_status"], | ||
unique=False, | ||
) | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
|
||
# drop the StagedResource.diff_status index created in the migration | ||
op.drop_index(op.f("ix_stagedresource_diff_status"), table_name="stagedresource") | ||
|
||
op.drop_table("stagedresourceancestor") | ||
# ### end Alembic commands ### |
214 changes: 214 additions & 0 deletions
214
...fides/api/alembic/migrations/versions/bf713b5a021d_staged_resource_ancestor_link_data_.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
"""staged resource ancestor link data migration | ||
|
||
Revision ID: bf713b5a021d | ||
Revises: 5474a47c77da | ||
Create Date: 2025-06-03 09:44:58.769535 | ||
|
||
""" | ||
|
||
import csv | ||
import uuid | ||
from io import StringIO | ||
from pathlib import Path | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from loguru import logger | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "bf713b5a021d" | ||
down_revision = "5474a47c77da" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
logger.info("Populating staged resource ancestor links...") | ||
|
||
conn = op.get_bind() | ||
|
||
# Get all resources and their children into memory | ||
resources_query = """ | ||
SELECT urn, children | ||
FROM stagedresource | ||
""" | ||
resources = conn.execute(resources_query).fetchall() | ||
|
||
# Build resource -> children map in memory | ||
resource_children = {} | ||
for resource in resources: | ||
if resource.children: | ||
resource_children[resource.urn] = resource.children | ||
|
||
# Build list of all ancestor-descendant pairs to insert | ||
ancestor_links = [] | ||
|
||
def process_children(ancestor_urn, children, visited=None): | ||
"""Recursively process children and collect ancestor links""" | ||
if visited is None: | ||
visited = set() | ||
|
||
for child_urn in children: | ||
if child_urn not in visited: | ||
visited.add(child_urn) | ||
# Add direct ancestor link | ||
ancestor_links.append( | ||
{ | ||
"id": f"{ancestor_urn}_{child_urn}", | ||
"ancestor_urn": ancestor_urn, | ||
"descendant_urn": child_urn, | ||
} | ||
) | ||
|
||
# Recursively process this child's children | ||
if child_urn in resource_children: | ||
process_children( | ||
ancestor_urn, resource_children[child_urn], visited | ||
) | ||
|
||
logger.info( | ||
f"Recursively processing {len(resource_children)} resources for ancestor links" | ||
) | ||
# Process each resource's children recursively | ||
for ancestor_urn, children in resource_children.items(): | ||
process_children(ancestor_urn, children) | ||
|
||
logger.info(f"Found {len(ancestor_links)} ancestor links to insert") | ||
|
||
logger.info(f"Writing {len(ancestor_links)} ancestor links to memory buffer") | ||
|
||
# Create in-memory string buffer | ||
csv_buffer = StringIO() | ||
writer = csv.DictWriter( | ||
csv_buffer, fieldnames=["id", "ancestor_urn", "descendant_urn"] | ||
) | ||
writer.writeheader() | ||
for link in ancestor_links: | ||
# Generate a UUID for each row | ||
link["id"] = f"srl_{uuid.uuid4()}" | ||
writer.writerow(link) | ||
|
||
# Reset buffer position to start | ||
csv_buffer.seek(0) | ||
|
||
logger.info( | ||
"Copying ancestor links data from memory buffer into stagedresourceancestor table..." | ||
) | ||
copy_query = """ | ||
COPY stagedresourceancestor (id, ancestor_urn, descendant_urn) | ||
FROM STDIN | ||
WITH (FORMAT CSV, HEADER TRUE) | ||
""" | ||
conn.connection.cursor().copy_expert(copy_query, csv_buffer) | ||
logger.info( | ||
"Completed copying ancestor links data from memory buffer into stagedresourceancestor table" | ||
) | ||
|
||
if len(ancestor_links) < 1000000: | ||
|
||
logger.info("Creating primary key index on stagedresourceancestor table...") | ||
|
||
op.create_index( | ||
"ix_staged_resource_ancestor_pkey", | ||
"stagedresourceancestor", | ||
["id"], | ||
unique=True, | ||
) | ||
|
||
logger.info("Completed creating primary key index") | ||
|
||
logger.info("Creating foreign key constraints stagedresourceancestor table...") | ||
|
||
op.create_foreign_key( | ||
"fk_staged_resource_ancestor_ancestor", | ||
"stagedresourceancestor", | ||
"stagedresource", | ||
["ancestor_urn"], | ||
["urn"], | ||
ondelete="CASCADE", | ||
) | ||
|
||
op.create_foreign_key( | ||
"fk_staged_resource_ancestor_descendant", | ||
"stagedresourceancestor", | ||
"stagedresource", | ||
["descendant_urn"], | ||
["urn"], | ||
ondelete="CASCADE", | ||
) | ||
|
||
logger.info("Completed creating foreign key constraints") | ||
|
||
logger.info("Creating unique constraint on stagedresourceancestor table...") | ||
|
||
op.create_unique_constraint( | ||
"uq_staged_resource_ancestor", | ||
"stagedresourceancestor", | ||
["ancestor_urn", "descendant_urn"], | ||
) | ||
|
||
logger.info("Completed creating unique constraint") | ||
|
||
logger.info("Creating indexes on stagedresourceancestor table...") | ||
|
||
op.create_index( | ||
"ix_staged_resource_ancestor_ancestor", | ||
"stagedresourceancestor", | ||
["ancestor_urn"], | ||
unique=False, | ||
) | ||
op.create_index( | ||
"ix_staged_resource_ancestor_descendant", | ||
"stagedresourceancestor", | ||
["descendant_urn"], | ||
unique=False, | ||
) | ||
|
||
logger.info("Completed creating indexes on stagedresourceancestor table") | ||
else: | ||
logger.info( | ||
"Skipping creation of primary key index, foreign key constraints, unique constraint, and indexes on stagedresourceancestor table because there are more than 1,000,000 ancestor links. Please run `post_upgrade_index_creation.py` to create these indexes and constraints." | ||
) | ||
|
||
|
||
def downgrade(): | ||
logger.info( | ||
"Downgrading staged resource ancestor link data migration, populating child_diff_statuses..." | ||
) | ||
|
||
# Get child diff statuses for each ancestor | ||
conn = op.get_bind() | ||
child_diff_statuses_query = """ | ||
UPDATE stagedresource | ||
SET child_diff_statuses = child_statuses.status_map | ||
FROM ( | ||
SELECT | ||
stagedresourceancestor.ancestor_urn, | ||
jsonb_object_agg(distinct(stagedresource.diff_status), True) as status_map | ||
FROM stagedresourceancestor | ||
JOIN stagedresource ON stagedresourceancestor.descendant_urn = stagedresource.urn | ||
GROUP BY stagedresourceancestor.ancestor_urn | ||
) AS child_statuses | ||
WHERE stagedresource.urn = child_statuses.ancestor_urn | ||
""" | ||
adamsachs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result = conn.execute(child_diff_statuses_query) | ||
updated_rows = result.rowcount | ||
|
||
logger.info( | ||
f"Downgraded staged resource ancestor link data migration, completed populating child_diff_statuses for {updated_rows} rows" | ||
) | ||
|
||
# drop the StagedResourceAncestor table and its indexes | ||
|
||
op.drop_index( | ||
"ix_staged_resource_ancestor_descendant", | ||
table_name="stagedresourceancestor", | ||
) | ||
op.drop_index( | ||
"ix_staged_resource_ancestor_ancestor", table_name="stagedresourceancestor" | ||
) | ||
|
||
op.drop_index( | ||
"ix_staged_resource_ancestor_pkey", | ||
table_name="stagedresourceancestor", | ||
) |
41 changes: 41 additions & 0 deletions
41
src/fides/api/alembic/migrations/versions/c586a56c25e7_remove_child_diff_statuses.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
""" | ||
Removes the stagedresource.child_diff_statuses column. | ||
|
||
This migration is run _after_ the data migration | ||
that populates the staged resource ancestor link table, | ||
to prevent unintended data loss. | ||
|
||
Revision ID: c586a56c25e7 | ||
Revises: bf713b5a021d | ||
Create Date: 2025-06-03 09:47:11.389652 | ||
|
||
""" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from loguru import logger | ||
from sqlalchemy.dialects import postgresql | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "c586a56c25e7" | ||
down_revision = "bf713b5a021d" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# remove the StagedResource.child_diff_statuses column as it's no longer needed | ||
op.drop_column("stagedresource", "child_diff_statuses") | ||
|
||
|
||
def downgrade(): | ||
# re-add the StagedResource.child_diff_statuses column | ||
op.add_column( | ||
"stagedresource", | ||
sa.Column( | ||
"child_diff_statuses", | ||
postgresql.JSONB(astext_type=sa.Text()), | ||
server_default="{}", | ||
nullable=False, | ||
), | ||
) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.