Skip to content

Commit cd2c209

Browse files
authored
Storing large access data externally (#6199)
1 parent 4d4d933 commit cd2c209

20 files changed

+2302
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
2525
- AWS SES notification service now supports assumed roles through environment variable configuration through `FIDES__CREDENTIALS__NOTIFICATIONS__AWS_SES_ASSUME_ROLE_ARN` [#6206](https://github.com/ethyca/fides/pull/6206)
2626
- Added ManualTask and ManualTaskReference models, foundational for for ManualDSRs [#6205](https://github.com/ethyca/fides/pull/6205) https://github.com/ethyca/fides/labels/db-migration
2727
- Added settings for read-only Redis instance [#6217](https://github.com/ethyca/fides/pull/6217)
28+
- Added support for large (>1GB) database columns by writing the contents to external storage [#6199](https://github.com/ethyca/fides/pull/6199)
2829

2930
### Changed
3031

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .encrypted_large_data import EncryptedLargeDataDescriptor
2+
3+
__all__ = [
4+
"EncryptedLargeDataDescriptor",
5+
]
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
from datetime import datetime
2+
from typing import Any, Optional, Type
3+
4+
from loguru import logger
5+
6+
from fides.api.api.deps import get_autoclose_db_session
7+
from fides.api.schemas.external_storage import ExternalStorageMetadata
8+
from fides.api.service.external_data_storage import (
9+
ExternalDataStorageError,
10+
ExternalDataStorageService,
11+
)
12+
from fides.api.util.data_size import LARGE_DATA_THRESHOLD_BYTES, calculate_data_size
13+
14+
15+
class EncryptedLargeDataDescriptor:
16+
"""
17+
A Python descriptor for database fields with encrypted external storage fallback.
18+
19+
See the original implementation for detailed docstrings.
20+
"""
21+
22+
def __init__(
23+
self,
24+
field_name: str,
25+
empty_default: Optional[Any] = None,
26+
threshold_bytes: Optional[int] = None,
27+
):
28+
self.field_name = field_name
29+
self.private_field = f"_{field_name}"
30+
self.empty_default = empty_default if empty_default is not None else []
31+
self.threshold_bytes = threshold_bytes or LARGE_DATA_THRESHOLD_BYTES
32+
self.model_class: Optional[str] = None
33+
self.name: Optional[str] = None
34+
35+
# Descriptor protocol helpers
36+
37+
def __set_name__(
38+
self, owner: Type, name: str
39+
) -> None: # noqa: D401 (docstring in orig file)
40+
self.name = name
41+
self.model_class = owner.__name__
42+
43+
def _generate_storage_path(self, instance: Any) -> str:
44+
instance_id = getattr(instance, "id", None)
45+
if not instance_id:
46+
raise ValueError(f"Instance {instance} must have an 'id' attribute")
47+
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f")
48+
return f"{self.model_class}/{instance_id}/{self.field_name}/{timestamp}.txt"
49+
50+
def __get__(self, instance: Any, owner: Type) -> Any: # noqa: D401
51+
if instance is None:
52+
return self
53+
raw_data = getattr(instance, self.private_field)
54+
if raw_data is None:
55+
return None
56+
if isinstance(raw_data, dict) and "storage_type" in raw_data:
57+
logger.info(
58+
f"Reading {self.model_class}.{self.field_name} from external storage "
59+
f"({raw_data.get('storage_type')})"
60+
)
61+
try:
62+
metadata = ExternalStorageMetadata.model_validate(raw_data)
63+
data = self._retrieve_external_data(metadata)
64+
record_count = len(data) if isinstance(data, list) else "N/A"
65+
logger.info(
66+
f"Successfully retrieved {self.model_class}.{self.field_name} "
67+
f"from external storage (records: {record_count})"
68+
)
69+
return data if data is not None else self.empty_default
70+
except Exception as e: # pylint: disable=broad-except
71+
logger.error(
72+
f"Failed to retrieve {self.model_class}.{self.field_name} "
73+
f"from external storage: {str(e)}"
74+
)
75+
raise ExternalDataStorageError(
76+
f"Failed to retrieve {self.field_name}: {str(e)}"
77+
) from e
78+
else:
79+
return raw_data
80+
81+
def __set__(self, instance: Any, value: Any) -> None: # noqa: D401
82+
if not value:
83+
self._cleanup_external_data(instance)
84+
setattr(instance, self.private_field, self.empty_default)
85+
return
86+
try:
87+
current_data = self.__get__(instance, type(instance))
88+
if current_data == value:
89+
return
90+
except Exception: # pylint: disable=broad-except
91+
pass
92+
93+
data_size = calculate_data_size(value)
94+
if data_size > self.threshold_bytes:
95+
logger.info(
96+
f"{self.model_class}.{self.field_name}: Data size ({data_size:,} bytes) "
97+
f"exceeds threshold ({self.threshold_bytes:,} bytes), storing externally"
98+
)
99+
self._cleanup_external_data(instance)
100+
metadata = self._store_external_data(instance, value)
101+
setattr(instance, self.private_field, metadata.model_dump())
102+
else:
103+
self._cleanup_external_data(instance)
104+
setattr(instance, self.private_field, value)
105+
106+
# External storage helpers
107+
108+
def _store_external_data(self, instance: Any, data: Any) -> ExternalStorageMetadata:
109+
storage_path = self._generate_storage_path(instance)
110+
with get_autoclose_db_session() as session:
111+
metadata = ExternalDataStorageService.store_data(
112+
db=session,
113+
storage_path=storage_path,
114+
data=data,
115+
)
116+
logger.info(
117+
f"Stored {self.model_class}.{self.field_name} to external storage: {storage_path}"
118+
)
119+
return metadata
120+
121+
@staticmethod
122+
def _retrieve_external_data(metadata: ExternalStorageMetadata) -> Any: # noqa: D401
123+
with get_autoclose_db_session() as session:
124+
return ExternalDataStorageService.retrieve_data(
125+
db=session,
126+
metadata=metadata,
127+
)
128+
129+
def _cleanup_external_data(self, instance: Any) -> None: # noqa: D401
130+
raw_data = getattr(instance, self.private_field, None)
131+
if isinstance(raw_data, dict) and "storage_type" in raw_data:
132+
try:
133+
metadata = ExternalStorageMetadata.model_validate(raw_data)
134+
with get_autoclose_db_session() as session:
135+
ExternalDataStorageService.delete_data(
136+
db=session,
137+
metadata=metadata,
138+
)
139+
logger.info(
140+
f"Cleaned up external storage for {self.model_class}.{self.field_name}: "
141+
f"{metadata.file_key}"
142+
)
143+
except Exception as e: # pylint: disable=broad-except
144+
logger.warning(
145+
f"Failed to cleanup external {self.field_name}: {str(e)}"
146+
)
147+
148+
# Public helper
149+
150+
def cleanup(self, instance: Any) -> None: # noqa: D401
151+
self._cleanup_external_data(instance)

src/fides/api/models/privacy_request/privacy_request.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from fides.api.models.client import ClientDetail
4949
from fides.api.models.comment import Comment, CommentReference, CommentReferenceType
5050
from fides.api.models.fides_user import FidesUser
51+
from fides.api.models.field_types import EncryptedLargeDataDescriptor
5152
from fides.api.models.manual_webhook import AccessManualWebhook
5253
from fides.api.models.policy import (
5354
Policy,
@@ -251,7 +252,8 @@ class PrivacyRequest(
251252
awaiting_email_send_at = Column(DateTime(timezone=True), nullable=True)
252253

253254
# Encrypted filtered access results saved for later retrieval
254-
filtered_final_upload = Column( # An encrypted JSON String - Dict[Dict[str, List[Row]]] - rule keys mapped to the filtered access results
255+
_filtered_final_upload = Column( # An encrypted JSON String - Dict[Dict[str, List[Row]]] - rule keys mapped to the filtered access results
256+
"filtered_final_upload",
255257
StringEncryptedType(
256258
type_in=JSONTypeOverride,
257259
key=CONFIG.security.app_encryption_key,
@@ -260,6 +262,11 @@ class PrivacyRequest(
260262
),
261263
)
262264

265+
# Use descriptor for automatic external storage handling
266+
filtered_final_upload = EncryptedLargeDataDescriptor(
267+
field_name="filtered_final_upload", empty_default={}
268+
)
269+
263270
# Encrypted filtered access results saved for later retrieval
264271
access_result_urls = Column( # An encrypted JSON String - Dict[Dict[str, List[Row]]] - rule keys mapped to the filtered access results
265272
StringEncryptedType(
@@ -334,6 +341,7 @@ def delete(self, db: Session) -> None:
334341
deleting this object from the database
335342
"""
336343
self.clear_cached_values()
344+
self.cleanup_external_storage()
337345
Attachment.delete_attachments_for_reference_and_type(
338346
db, self.id, AttachmentReferenceType.privacy_request
339347
)
@@ -1257,6 +1265,11 @@ def get_consent_results(self) -> Dict[str, int]:
12571265
# DSR 2.0 does not cache the results so nothing to do here
12581266
return {}
12591267

1268+
def cleanup_external_storage(self) -> None:
1269+
"""Clean up all external storage files for this privacy request"""
1270+
# Access the descriptor from the class to call cleanup
1271+
PrivacyRequest.filtered_final_upload.cleanup(self)
1272+
12601273
def save_filtered_access_results(
12611274
self, db: Session, results: Dict[str, Dict[str, List[Row]]]
12621275
) -> None:
@@ -1544,7 +1557,7 @@ def get_action_required_details(
15441557

15451558

15461559
def _parse_cache_to_checkpoint_action_required(
1547-
cache: dict[str, Any]
1560+
cache: dict[str, Any],
15481561
) -> CheckpointActionRequired:
15491562
collection = (
15501563
CollectionAddress(

src/fides/api/models/privacy_request/request_task.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
StringEncryptedType,
1515
)
1616

17-
from fides.api.db.base_class import Base # type: ignore[attr-defined]
18-
from fides.api.db.base_class import JSONTypeOverride
17+
from fides.api.db.base_class import Base, JSONTypeOverride # type: ignore[attr-defined]
1918
from fides.api.db.util import EnumColumn
2019
from fides.api.graph.config import (
2120
ROOT_COLLECTION_ADDRESS,
2221
TERMINATOR_ADDRESS,
2322
CollectionAddress,
2423
)
24+
from fides.api.models.field_types import EncryptedLargeDataDescriptor
2525
from fides.api.models.privacy_request.execution_log import (
2626
COMPLETED_EXECUTION_LOG_STATUSES,
2727
)
@@ -121,7 +121,8 @@ class RequestTask(Base):
121121
# Raw data retrieved from an access request is stored here. This contains all of the
122122
# intermediate data we retrieved, needed for downstream tasks, but hasn't been filtered
123123
# by data category for the end user.
124-
access_data = Column( # An encrypted JSON String - saved as a list of Rows
124+
_access_data = Column( # An encrypted JSON String - saved as a list of Rows
125+
"access_data",
125126
StringEncryptedType(
126127
type_in=JSONTypeOverride,
127128
key=CONFIG.security.app_encryption_key,
@@ -132,7 +133,8 @@ class RequestTask(Base):
132133

133134
# This is the raw access data saved in erasure format (with placeholders preserved) to perform a masking request.
134135
# First saved on the access node, and then copied to the corresponding erasure node.
135-
data_for_erasures = Column( # An encrypted JSON String - saved as a list of rows
136+
_data_for_erasures = Column( # An encrypted JSON String - saved as a list of rows
137+
"data_for_erasures",
136138
StringEncryptedType(
137139
type_in=JSONTypeOverride,
138140
key=CONFIG.security.app_encryption_key,
@@ -141,6 +143,15 @@ class RequestTask(Base):
141143
),
142144
)
143145

146+
# Use descriptors for automatic external storage handling
147+
access_data = EncryptedLargeDataDescriptor(
148+
field_name="access_data", empty_default=[]
149+
)
150+
151+
data_for_erasures = EncryptedLargeDataDescriptor(
152+
field_name="data_for_erasures", empty_default=[]
153+
)
154+
144155
# Written after an erasure is completed
145156
rows_masked = Column(Integer)
146157
# Written after a consent request is completed - not all consent
@@ -183,6 +194,12 @@ def get_cached_task_id(self) -> Optional[str]:
183194
task_id = cache.get(get_async_task_tracking_cache_key(self.id))
184195
return task_id
185196

197+
def cleanup_external_storage(self) -> None:
198+
"""Clean up all external storage files for this request task"""
199+
# Access the descriptor from the class to call cleanup
200+
RequestTask.access_data.cleanup(self)
201+
RequestTask.data_for_erasures.cleanup(self)
202+
186203
def get_access_data(self) -> List[Row]:
187204
"""Helper to retrieve access data or default to empty list"""
188205
return self.access_data or []
@@ -191,6 +208,11 @@ def get_data_for_erasures(self) -> List[Row]:
191208
"""Helper to retrieve erasure data needed to build masking requests or default to empty list"""
192209
return self.data_for_erasures or []
193210

211+
def delete(self, db: Session) -> None:
212+
"""Override delete to cleanup external storage first"""
213+
self.cleanup_external_storage()
214+
super().delete(db)
215+
194216
def update_status(self, db: Session, status: ExecutionLogStatus) -> None:
195217
"""Helper method to update a task's status"""
196218
self.status = status
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""Schema for external storage metadata."""
2+
3+
from typing import Optional
4+
5+
from pydantic import Field
6+
7+
from fides.api.schemas.base_class import FidesSchema
8+
from fides.api.schemas.storage.storage import StorageType
9+
10+
11+
class ExternalStorageMetadata(FidesSchema):
12+
"""Metadata for externally stored encrypted data."""
13+
14+
storage_type: StorageType
15+
file_key: str = Field(description="Path/key of the file in external storage")
16+
filesize: int = Field(description="Size of the stored file in bytes", ge=0)
17+
storage_key: Optional[str] = Field(
18+
default=None, description="Storage configuration key used"
19+
)
20+
21+
class Config:
22+
use_enum_values = True

0 commit comments

Comments
 (0)