Skip to content

ENG-663: add StagedResourceAncestors table and remove child_diff_statuses #6185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .fides/db_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2354,8 +2354,6 @@ dataset:
data_categories: [system]
- name: stagedresource
fields:
- name: child_diff_statuses
data_categories: [system]
- name: children
data_categories: [system]
- name: classifications
Expand Down Expand Up @@ -2587,3 +2585,15 @@ dataset:
data_categories: [system.operations]
- name: updated_at
data_categories: [system.operations]
- name: stagedresourceancestor
fields:
- name: id
data_categories: [system.operations]
- name: descendant_urn
data_categories: [system.operations]
- name: ancestor_urn
data_categories: [system.operations]
- name: created_at
data_categories: [system.operations]
- name: updated_at
data_categories: [system.operations]
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
- Added new user types respondent and external_respondent [#6177](https://github.com/ethyca/fides/pull/6177) https://github.com/ethyca/fides/labels/db-migration
- Added Execution logging for uploading Access Packages [#6191](https://github.com/ethyca/fides/pull/6191)
- Added UI for creating shared monitor configurations [#6188](https://github.com/ethyca/fides/pull/6188)
- Added Fides-Client header to http logger middleware [#6195](https://github.com/ethyca/fides/pull/6195)
- Added StagedResourceAncestor table to support dynamic `child_diff_statuses` calculations [#6185](https://github.com/ethyca/fides/pull/6185) https://github.com/ethyca/fides/labels/high-risk https://github.com/ethyca/fides/labels/db-migration

### Changed
- Deprecated `FidesInitialized` event and added `FidesConsentLoaded` and `FidesReady` events for more granular initialization state handling [#6181](https://github.com/ethyca/fides/pull/6181)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Creates staged resource ancestor link table.

Does not populate the table with data, that is handled in a separate data migration.

Also does not drop the StagedResource.child_diff_statuses column,
that is handled in a separate migration, _after_ the data migration
that populates the staged resource ancestor link table, to prevent
unintended data loss.

Revision ID: 5474a47c77da
Revises: 440a5b9a3493
Create Date: 2025-05-27 19:02:43.802783

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "5474a47c77da"
down_revision = "440a5b9a3493"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"stagedresourceancestor",
sa.Column("id", sa.String(length=255), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column("ancestor_urn", sa.String(), nullable=False),
sa.Column("descendant_urn", sa.String(), nullable=False),
# primary key, foreign key and unique constraints are deferred
# until after the data migration that populates the table,
# to avoid slowing down the data migration
)

# defer creating `stagedresourceancestor` indexes until after the data migration
# that populates the table, to avoid slowing down the data migration

# add an index for StagedResource.diff_status to improve queries against the diff_status field
op.create_index(
op.f("ix_stagedresource_diff_status"),
"stagedresource",
["diff_status"],
unique=False,
)


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###

# drop the StagedResource.diff_status index created in the migration
op.drop_index(op.f("ix_stagedresource_diff_status"), table_name="stagedresource")

op.drop_table("stagedresourceancestor")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""staged resource ancestor link data migration

Revision ID: bf713b5a021d
Revises: 5474a47c77da
Create Date: 2025-06-03 09:44:58.769535

"""

import csv
import uuid
from io import StringIO
from pathlib import Path

import sqlalchemy as sa
from alembic import op
from loguru import logger

# revision identifiers, used by Alembic.
revision = "bf713b5a021d"
down_revision = "5474a47c77da"
branch_labels = None
depends_on = None


def upgrade():
logger.info("Populating staged resource ancestor links...")

conn = op.get_bind()

# Get all resources and their children into memory
resources_query = """
SELECT urn, children
FROM stagedresource
"""
resources = conn.execute(resources_query).fetchall()

# Build resource -> children map in memory
resource_children = {}
for resource in resources:
if resource.children:
resource_children[resource.urn] = resource.children

# Build list of all ancestor-descendant pairs to insert
ancestor_links = []

def process_children(ancestor_urn, children, visited=None):
"""Recursively process children and collect ancestor links"""
if visited is None:
visited = set()

for child_urn in children:
if child_urn not in visited:
visited.add(child_urn)
# Add direct ancestor link
ancestor_links.append(
{
"id": f"{ancestor_urn}_{child_urn}",
"ancestor_urn": ancestor_urn,
"descendant_urn": child_urn,
}
)

# Recursively process this child's children
if child_urn in resource_children:
process_children(
ancestor_urn, resource_children[child_urn], visited
)

logger.info(
f"Recursively processing {len(resource_children)} resources for ancestor links"
)
# Process each resource's children recursively
for ancestor_urn, children in resource_children.items():
process_children(ancestor_urn, children)

logger.info(f"Found {len(ancestor_links)} ancestor links to insert")

logger.info(f"Writing {len(ancestor_links)} ancestor links to memory buffer")

# Create in-memory string buffer
csv_buffer = StringIO()
writer = csv.DictWriter(
csv_buffer, fieldnames=["id", "ancestor_urn", "descendant_urn"]
)
writer.writeheader()
for link in ancestor_links:
# Generate a UUID for each row
link["id"] = f"srl_{uuid.uuid4()}"
writer.writerow(link)

# Reset buffer position to start
csv_buffer.seek(0)

logger.info(
"Copying ancestor links data from memory buffer into stagedresourceancestor table..."
)
copy_query = """
COPY stagedresourceancestor (id, ancestor_urn, descendant_urn)
FROM STDIN
WITH (FORMAT CSV, HEADER TRUE)
"""
conn.connection.cursor().copy_expert(copy_query, csv_buffer)
logger.info(
"Completed copying ancestor links data from memory buffer into stagedresourceancestor table"
)

if len(ancestor_links) < 1000000:

logger.info("Creating primary key index on stagedresourceancestor table...")

op.create_index(
"ix_staged_resource_ancestor_pkey",
"stagedresourceancestor",
["id"],
unique=True,
)

logger.info("Completed creating primary key index")

logger.info("Creating foreign key constraints stagedresourceancestor table...")

op.create_foreign_key(
"fk_staged_resource_ancestor_ancestor",
"stagedresourceancestor",
"stagedresource",
["ancestor_urn"],
["urn"],
ondelete="CASCADE",
)

op.create_foreign_key(
"fk_staged_resource_ancestor_descendant",
"stagedresourceancestor",
"stagedresource",
["descendant_urn"],
["urn"],
ondelete="CASCADE",
)

logger.info("Completed creating foreign key constraints")

logger.info("Creating unique constraint on stagedresourceancestor table...")

op.create_unique_constraint(
"uq_staged_resource_ancestor",
"stagedresourceancestor",
["ancestor_urn", "descendant_urn"],
)

logger.info("Completed creating unique constraint")

logger.info("Creating indexes on stagedresourceancestor table...")

op.create_index(
"ix_staged_resource_ancestor_ancestor",
"stagedresourceancestor",
["ancestor_urn"],
unique=False,
)
op.create_index(
"ix_staged_resource_ancestor_descendant",
"stagedresourceancestor",
["descendant_urn"],
unique=False,
)

logger.info("Completed creating indexes on stagedresourceancestor table")
else:
logger.info(
"Skipping creation of primary key index, foreign key constraints, unique constraint, and indexes on stagedresourceancestor table because there are more than 1,000,000 ancestor links. Please run `post_upgrade_index_creation.py` to create these indexes and constraints."
)


def downgrade():
logger.info(
"Downgrading staged resource ancestor link data migration, populating child_diff_statuses..."
)

# Get child diff statuses for each ancestor
conn = op.get_bind()
child_diff_statuses_query = """
UPDATE stagedresource
SET child_diff_statuses = child_statuses.status_map
FROM (
SELECT
stagedresourceancestor.ancestor_urn,
jsonb_object_agg(distinct(stagedresource.diff_status), True) as status_map
FROM stagedresourceancestor
JOIN stagedresource ON stagedresourceancestor.descendant_urn = stagedresource.urn
GROUP BY stagedresourceancestor.ancestor_urn
) AS child_statuses
WHERE stagedresource.urn = child_statuses.ancestor_urn
"""
result = conn.execute(child_diff_statuses_query)
updated_rows = result.rowcount

logger.info(
f"Downgraded staged resource ancestor link data migration, completed populating child_diff_statuses for {updated_rows} rows"
)

# drop the StagedResourceAncestor table and its indexes

op.drop_index(
"ix_staged_resource_ancestor_descendant",
table_name="stagedresourceancestor",
)
op.drop_index(
"ix_staged_resource_ancestor_ancestor", table_name="stagedresourceancestor"
)

op.drop_index(
"ix_staged_resource_ancestor_pkey",
table_name="stagedresourceancestor",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Removes the stagedresource.child_diff_statuses column.

This migration is run _after_ the data migration
that populates the staged resource ancestor link table,
to prevent unintended data loss.

Revision ID: c586a56c25e7
Revises: bf713b5a021d
Create Date: 2025-06-03 09:47:11.389652

"""

import sqlalchemy as sa
from alembic import op
from loguru import logger
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "c586a56c25e7"
down_revision = "bf713b5a021d"
branch_labels = None
depends_on = None


def upgrade():
# remove the StagedResource.child_diff_statuses column as it's no longer needed
op.drop_column("stagedresource", "child_diff_statuses")


def downgrade():
# re-add the StagedResource.child_diff_statuses column
op.add_column(
"stagedresource",
sa.Column(
"child_diff_statuses",
postgresql.JSONB(astext_type=sa.Text()),
server_default="{}",
nullable=False,
),
)
Loading
Loading