Skip to content

Storing large access data externally #6199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7b2ff24
Storing large access data externally
galvana Jun 4, 2025
a5c78cd
Adding encryption
galvana Jun 5, 2025
b31a00f
Clean up
galvana Jun 7, 2025
12c51d4
Fixing failing tests
galvana Jun 8, 2025
a71a72e
Fixing tests
galvana Jun 8, 2025
1f4f4ff
Fixing tests
galvana Jun 8, 2025
3103315
Fixing test
galvana Jun 8, 2025
11d6a92
Fixing tests
galvana Jun 8, 2025
956988c
Resetting LARGE_DATA_THRESHOLD_BYTES
galvana Jun 8, 2025
1361bd8
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana Jun 8, 2025
f3c52e1
Cleaning up code and adding fallback to privacy request model
galvana Jun 9, 2025
87ebbe9
Fixing static checks
galvana Jun 9, 2025
2a23e82
Removing pytest mark
galvana Jun 9, 2025
9cfd942
Fixing tests
galvana Jun 9, 2025
813d15a
Test cleanup
galvana Jun 9, 2025
39ed44b
Adding more tests
galvana Jun 10, 2025
cceee47
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana Jun 10, 2025
f08172b
Static fixes
galvana Jun 10, 2025
b510d17
Fixing test
galvana Jun 10, 2025
a8b52e2
Fixing S3 file limit
galvana Jun 10, 2025
972a01e
Updating large file threshold and optimizing memory usage
galvana Jun 11, 2025
af971aa
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana Jun 11, 2025
9ed4144
Changes based on PR feedback
galvana Jun 11, 2025
525ca86
Removing unused file
galvana Jun 11, 2025
48ff1c5
Fixing comment format
galvana Jun 11, 2025
73c9fea
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana Jun 11, 2025
e1e79ca
Fixing patch path
galvana Jun 11, 2025
59600c9
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana Jun 11, 2025
bfe61ea
Updating change log
galvana Jun 11, 2025
9d31e68
Fixing change log
galvana Jun 11, 2025
c750ecc
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/fides/api/models/field_types/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .encrypted_large_data import EncryptedLargeDataDescriptor

__all__ = [
"EncryptedLargeDataDescriptor",
]
151 changes: 151 additions & 0 deletions src/fides/api/models/field_types/encrypted_large_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from datetime import datetime
from typing import Any, Optional, Type

from loguru import logger

from fides.api.api.deps import get_autoclose_db_session
from fides.api.schemas.external_storage import ExternalStorageMetadata
from fides.api.service.external_data_storage import (
ExternalDataStorageError,
ExternalDataStorageService,
)
from fides.api.util.data_size import LARGE_DATA_THRESHOLD_BYTES, calculate_data_size


class EncryptedLargeDataDescriptor:
"""
A Python descriptor for database fields with encrypted external storage fallback.

See the original implementation for detailed docstrings.
"""

def __init__(
self,
field_name: str,
empty_default: Optional[Any] = None,
threshold_bytes: Optional[int] = None,
):
self.field_name = field_name
self.private_field = f"_{field_name}"
self.empty_default = empty_default if empty_default is not None else []
self.threshold_bytes = threshold_bytes or LARGE_DATA_THRESHOLD_BYTES
self.model_class: Optional[str] = None
self.name: Optional[str] = None

# Descriptor protocol helpers

def __set_name__(
self, owner: Type, name: str
) -> None: # noqa: D401 (docstring in orig file)
self.name = name
self.model_class = owner.__name__

def _generate_storage_path(self, instance: Any) -> str:
instance_id = getattr(instance, "id", None)
if not instance_id:
raise ValueError(f"Instance {instance} must have an 'id' attribute")
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f")
return f"{self.model_class}/{instance_id}/{self.field_name}/{timestamp}.txt"

def __get__(self, instance: Any, owner: Type) -> Any: # noqa: D401
if instance is None:
return self
raw_data = getattr(instance, self.private_field)
if raw_data is None:
return None
if isinstance(raw_data, dict) and "storage_type" in raw_data:
logger.info(
f"Reading {self.model_class}.{self.field_name} from external storage "
f"({raw_data.get('storage_type')})"
)
try:
metadata = ExternalStorageMetadata.model_validate(raw_data)
data = self._retrieve_external_data(metadata)
record_count = len(data) if isinstance(data, list) else "N/A"
logger.info(
f"Successfully retrieved {self.model_class}.{self.field_name} "
f"from external storage (records: {record_count})"
)
return data if data is not None else self.empty_default
except Exception as e: # pylint: disable=broad-except
logger.error(
f"Failed to retrieve {self.model_class}.{self.field_name} "
f"from external storage: {str(e)}"
)
raise ExternalDataStorageError(
f"Failed to retrieve {self.field_name}: {str(e)}"
) from e
else:
return raw_data

def __set__(self, instance: Any, value: Any) -> None: # noqa: D401
if not value:
self._cleanup_external_data(instance)
setattr(instance, self.private_field, self.empty_default)
return
try:
current_data = self.__get__(instance, type(instance))
if current_data == value:
return
except Exception: # pylint: disable=broad-except
pass

data_size = calculate_data_size(value)
if data_size > self.threshold_bytes:
logger.info(
f"{self.model_class}.{self.field_name}: Data size ({data_size:,} bytes) "
f"exceeds threshold ({self.threshold_bytes:,} bytes), storing externally"
)
self._cleanup_external_data(instance)
metadata = self._store_external_data(instance, value)
setattr(instance, self.private_field, metadata.model_dump())
else:
self._cleanup_external_data(instance)
setattr(instance, self.private_field, value)

# External storage helpers

def _store_external_data(self, instance: Any, data: Any) -> ExternalStorageMetadata:
storage_path = self._generate_storage_path(instance)
with get_autoclose_db_session() as session:
metadata = ExternalDataStorageService.store_data(
db=session,
storage_path=storage_path,
data=data,
)
logger.info(
f"Stored {self.model_class}.{self.field_name} to external storage: {storage_path}"
)
return metadata

@staticmethod
def _retrieve_external_data(metadata: ExternalStorageMetadata) -> Any: # noqa: D401
with get_autoclose_db_session() as session:
return ExternalDataStorageService.retrieve_data(
db=session,
metadata=metadata,
)

def _cleanup_external_data(self, instance: Any) -> None: # noqa: D401
raw_data = getattr(instance, self.private_field, None)
if isinstance(raw_data, dict) and "storage_type" in raw_data:
try:
metadata = ExternalStorageMetadata.model_validate(raw_data)
with get_autoclose_db_session() as session:
ExternalDataStorageService.delete_data(
db=session,
metadata=metadata,
)
logger.info(
f"Cleaned up external storage for {self.model_class}.{self.field_name}: "
f"{metadata.file_key}"
)
except Exception as e: # pylint: disable=broad-except
logger.warning(
f"Failed to cleanup external {self.field_name}: {str(e)}"
)

# Public helper

def cleanup(self, instance: Any) -> None: # noqa: D401
self._cleanup_external_data(instance)
17 changes: 15 additions & 2 deletions src/fides/api/models/privacy_request/privacy_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from fides.api.models.client import ClientDetail
from fides.api.models.comment import Comment, CommentReference, CommentReferenceType
from fides.api.models.fides_user import FidesUser
from fides.api.models.field_types import EncryptedLargeDataDescriptor
from fides.api.models.manual_webhook import AccessManualWebhook
from fides.api.models.policy import (
Policy,
Expand Down Expand Up @@ -251,7 +252,8 @@ class PrivacyRequest(
awaiting_email_send_at = Column(DateTime(timezone=True), nullable=True)

# Encrypted filtered access results saved for later retrieval
filtered_final_upload = Column( # An encrypted JSON String - Dict[Dict[str, List[Row]]] - rule keys mapped to the filtered access results
_filtered_final_upload = Column( # An encrypted JSON String - Dict[Dict[str, List[Row]]] - rule keys mapped to the filtered access results
"filtered_final_upload",
StringEncryptedType(
type_in=JSONTypeOverride,
key=CONFIG.security.app_encryption_key,
Expand All @@ -260,6 +262,11 @@ class PrivacyRequest(
),
)

# Use descriptor for automatic external storage handling
filtered_final_upload = EncryptedLargeDataDescriptor(
field_name="filtered_final_upload", empty_default={}
)

# Encrypted filtered access results saved for later retrieval
access_result_urls = Column( # An encrypted JSON String - Dict[Dict[str, List[Row]]] - rule keys mapped to the filtered access results
StringEncryptedType(
Expand Down Expand Up @@ -334,6 +341,7 @@ def delete(self, db: Session) -> None:
deleting this object from the database
"""
self.clear_cached_values()
self.cleanup_external_storage()
Attachment.delete_attachments_for_reference_and_type(
db, self.id, AttachmentReferenceType.privacy_request
)
Expand Down Expand Up @@ -1257,6 +1265,11 @@ def get_consent_results(self) -> Dict[str, int]:
# DSR 2.0 does not cache the results so nothing to do here
return {}

def cleanup_external_storage(self) -> None:
"""Clean up all external storage files for this privacy request"""
# Access the descriptor from the class to call cleanup
PrivacyRequest.filtered_final_upload.cleanup(self)

def save_filtered_access_results(
self, db: Session, results: Dict[str, Dict[str, List[Row]]]
) -> None:
Expand Down Expand Up @@ -1544,7 +1557,7 @@ def get_action_required_details(


def _parse_cache_to_checkpoint_action_required(
cache: dict[str, Any]
cache: dict[str, Any],
) -> CheckpointActionRequired:
collection = (
CollectionAddress(
Expand Down
30 changes: 26 additions & 4 deletions src/fides/api/models/privacy_request/request_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
StringEncryptedType,
)

from fides.api.db.base_class import Base # type: ignore[attr-defined]
from fides.api.db.base_class import JSONTypeOverride
from fides.api.db.base_class import Base, JSONTypeOverride # type: ignore[attr-defined]
from fides.api.db.util import EnumColumn
from fides.api.graph.config import (
ROOT_COLLECTION_ADDRESS,
TERMINATOR_ADDRESS,
CollectionAddress,
)
from fides.api.models.field_types import EncryptedLargeDataDescriptor
from fides.api.models.privacy_request.execution_log import (
COMPLETED_EXECUTION_LOG_STATUSES,
)
Expand Down Expand Up @@ -121,7 +121,8 @@ class RequestTask(Base):
# Raw data retrieved from an access request is stored here. This contains all of the
# intermediate data we retrieved, needed for downstream tasks, but hasn't been filtered
# by data category for the end user.
access_data = Column( # An encrypted JSON String - saved as a list of Rows
_access_data = Column( # An encrypted JSON String - saved as a list of Rows
"access_data",
StringEncryptedType(
type_in=JSONTypeOverride,
key=CONFIG.security.app_encryption_key,
Expand All @@ -132,7 +133,8 @@ class RequestTask(Base):

# This is the raw access data saved in erasure format (with placeholders preserved) to perform a masking request.
# First saved on the access node, and then copied to the corresponding erasure node.
data_for_erasures = Column( # An encrypted JSON String - saved as a list of rows
_data_for_erasures = Column( # An encrypted JSON String - saved as a list of rows
"data_for_erasures",
StringEncryptedType(
type_in=JSONTypeOverride,
key=CONFIG.security.app_encryption_key,
Expand All @@ -141,6 +143,15 @@ class RequestTask(Base):
),
)

# Use descriptors for automatic external storage handling
access_data = EncryptedLargeDataDescriptor(
field_name="access_data", empty_default=[]
)

data_for_erasures = EncryptedLargeDataDescriptor(
field_name="data_for_erasures", empty_default=[]
)

# Written after an erasure is completed
rows_masked = Column(Integer)
# Written after a consent request is completed - not all consent
Expand Down Expand Up @@ -183,6 +194,12 @@ def get_cached_task_id(self) -> Optional[str]:
task_id = cache.get(get_async_task_tracking_cache_key(self.id))
return task_id

def cleanup_external_storage(self) -> None:
"""Clean up all external storage files for this request task"""
# Access the descriptor from the class to call cleanup
RequestTask.access_data.cleanup(self)
RequestTask.data_for_erasures.cleanup(self)

def get_access_data(self) -> List[Row]:
"""Helper to retrieve access data or default to empty list"""
return self.access_data or []
Expand All @@ -191,6 +208,11 @@ def get_data_for_erasures(self) -> List[Row]:
"""Helper to retrieve erasure data needed to build masking requests or default to empty list"""
return self.data_for_erasures or []

def delete(self, db: Session) -> None:
"""Override delete to cleanup external storage first"""
self.cleanup_external_storage()
super().delete(db)

def update_status(self, db: Session, status: ExecutionLogStatus) -> None:
"""Helper method to update a task's status"""
self.status = status
Expand Down
22 changes: 22 additions & 0 deletions src/fides/api/schemas/external_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Schema for external storage metadata."""

from typing import Optional

from pydantic import Field

from fides.api.schemas.base_class import FidesSchema
from fides.api.schemas.storage.storage import StorageType


class ExternalStorageMetadata(FidesSchema):
"""Metadata for externally stored encrypted data."""

storage_type: StorageType
file_key: str = Field(description="Path/key of the file in external storage")
filesize: int = Field(description="Size of the stored file in bytes", ge=0)
storage_key: Optional[str] = Field(
default=None, description="Storage configuration key used"
)

class Config:
use_enum_values = True
Loading
Loading