-
Notifications
You must be signed in to change notification settings - Fork 81
Storing large access data externally #6199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
7b2ff24
Storing large access data externally
galvana a5c78cd
Adding encryption
galvana b31a00f
Clean up
galvana 12c51d4
Fixing failing tests
galvana a71a72e
Fixing tests
galvana 1f4f4ff
Fixing tests
galvana 3103315
Fixing test
galvana 11d6a92
Fixing tests
galvana 956988c
Resetting LARGE_DATA_THRESHOLD_BYTES
galvana 1361bd8
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana f3c52e1
Cleaning up code and adding fallback to privacy request model
galvana 87ebbe9
Fixing static checks
galvana 2a23e82
Removing pytest mark
galvana 9cfd942
Fixing tests
galvana 813d15a
Test cleanup
galvana 39ed44b
Adding more tests
galvana cceee47
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana f08172b
Static fixes
galvana b510d17
Fixing test
galvana a8b52e2
Fixing S3 file limit
galvana 972a01e
Updating large file threshold and optimizing memory usage
galvana af971aa
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana 9ed4144
Changes based on PR feedback
galvana 525ca86
Removing unused file
galvana 48ff1c5
Fixing comment format
galvana 73c9fea
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana e1e79ca
Fixing patch path
galvana 59600c9
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana bfe61ea
Updating change log
galvana 9d31e68
Fixing change log
galvana c750ecc
Merge branch 'main' into ENG-684-save-large-access-data-externally
galvana File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .encrypted_large_data import EncryptedLargeDataDescriptor | ||
|
||
__all__ = [ | ||
"EncryptedLargeDataDescriptor", | ||
] |
151 changes: 151 additions & 0 deletions
151
src/fides/api/models/field_types/encrypted_large_data.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
from datetime import datetime | ||
from typing import Any, Optional, Type | ||
|
||
from loguru import logger | ||
|
||
from fides.api.api.deps import get_autoclose_db_session | ||
from fides.api.schemas.external_storage import ExternalStorageMetadata | ||
from fides.api.service.external_data_storage import ( | ||
ExternalDataStorageError, | ||
ExternalDataStorageService, | ||
) | ||
from fides.api.util.data_size import LARGE_DATA_THRESHOLD_BYTES, calculate_data_size | ||
|
||
|
||
class EncryptedLargeDataDescriptor: | ||
""" | ||
A Python descriptor for database fields with encrypted external storage fallback. | ||
|
||
See the original implementation for detailed docstrings. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
field_name: str, | ||
empty_default: Optional[Any] = None, | ||
threshold_bytes: Optional[int] = None, | ||
): | ||
self.field_name = field_name | ||
self.private_field = f"_{field_name}" | ||
self.empty_default = empty_default if empty_default is not None else [] | ||
self.threshold_bytes = threshold_bytes or LARGE_DATA_THRESHOLD_BYTES | ||
self.model_class: Optional[str] = None | ||
self.name: Optional[str] = None | ||
|
||
# Descriptor protocol helpers | ||
|
||
def __set_name__( | ||
self, owner: Type, name: str | ||
) -> None: # noqa: D401 (docstring in orig file) | ||
self.name = name | ||
self.model_class = owner.__name__ | ||
|
||
def _generate_storage_path(self, instance: Any) -> str: | ||
instance_id = getattr(instance, "id", None) | ||
if not instance_id: | ||
raise ValueError(f"Instance {instance} must have an 'id' attribute") | ||
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f") | ||
return f"{self.model_class}/{instance_id}/{self.field_name}/{timestamp}.txt" | ||
|
||
def __get__(self, instance: Any, owner: Type) -> Any: # noqa: D401 | ||
if instance is None: | ||
return self | ||
raw_data = getattr(instance, self.private_field) | ||
if raw_data is None: | ||
return None | ||
if isinstance(raw_data, dict) and "storage_type" in raw_data: | ||
logger.info( | ||
f"Reading {self.model_class}.{self.field_name} from external storage " | ||
f"({raw_data.get('storage_type')})" | ||
) | ||
try: | ||
metadata = ExternalStorageMetadata.model_validate(raw_data) | ||
data = self._retrieve_external_data(metadata) | ||
record_count = len(data) if isinstance(data, list) else "N/A" | ||
logger.info( | ||
f"Successfully retrieved {self.model_class}.{self.field_name} " | ||
f"from external storage (records: {record_count})" | ||
) | ||
return data if data is not None else self.empty_default | ||
except Exception as e: # pylint: disable=broad-except | ||
logger.error( | ||
f"Failed to retrieve {self.model_class}.{self.field_name} " | ||
f"from external storage: {str(e)}" | ||
) | ||
raise ExternalDataStorageError( | ||
f"Failed to retrieve {self.field_name}: {str(e)}" | ||
) from e | ||
else: | ||
return raw_data | ||
|
||
def __set__(self, instance: Any, value: Any) -> None: # noqa: D401 | ||
if not value: | ||
self._cleanup_external_data(instance) | ||
setattr(instance, self.private_field, self.empty_default) | ||
return | ||
try: | ||
current_data = self.__get__(instance, type(instance)) | ||
if current_data == value: | ||
return | ||
except Exception: # pylint: disable=broad-except | ||
pass | ||
|
||
data_size = calculate_data_size(value) | ||
if data_size > self.threshold_bytes: | ||
logger.info( | ||
f"{self.model_class}.{self.field_name}: Data size ({data_size:,} bytes) " | ||
f"exceeds threshold ({self.threshold_bytes:,} bytes), storing externally" | ||
) | ||
self._cleanup_external_data(instance) | ||
metadata = self._store_external_data(instance, value) | ||
setattr(instance, self.private_field, metadata.model_dump()) | ||
else: | ||
self._cleanup_external_data(instance) | ||
setattr(instance, self.private_field, value) | ||
|
||
# External storage helpers | ||
|
||
def _store_external_data(self, instance: Any, data: Any) -> ExternalStorageMetadata: | ||
storage_path = self._generate_storage_path(instance) | ||
with get_autoclose_db_session() as session: | ||
metadata = ExternalDataStorageService.store_data( | ||
db=session, | ||
storage_path=storage_path, | ||
data=data, | ||
) | ||
logger.info( | ||
f"Stored {self.model_class}.{self.field_name} to external storage: {storage_path}" | ||
) | ||
return metadata | ||
|
||
@staticmethod | ||
def _retrieve_external_data(metadata: ExternalStorageMetadata) -> Any: # noqa: D401 | ||
with get_autoclose_db_session() as session: | ||
return ExternalDataStorageService.retrieve_data( | ||
db=session, | ||
metadata=metadata, | ||
) | ||
|
||
def _cleanup_external_data(self, instance: Any) -> None: # noqa: D401 | ||
raw_data = getattr(instance, self.private_field, None) | ||
if isinstance(raw_data, dict) and "storage_type" in raw_data: | ||
try: | ||
metadata = ExternalStorageMetadata.model_validate(raw_data) | ||
with get_autoclose_db_session() as session: | ||
ExternalDataStorageService.delete_data( | ||
db=session, | ||
metadata=metadata, | ||
) | ||
logger.info( | ||
f"Cleaned up external storage for {self.model_class}.{self.field_name}: " | ||
f"{metadata.file_key}" | ||
) | ||
except Exception as e: # pylint: disable=broad-except | ||
logger.warning( | ||
f"Failed to cleanup external {self.field_name}: {str(e)}" | ||
) | ||
|
||
# Public helper | ||
|
||
def cleanup(self, instance: Any) -> None: # noqa: D401 | ||
self._cleanup_external_data(instance) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
"""Schema for external storage metadata.""" | ||
|
||
from typing import Optional | ||
|
||
from pydantic import Field | ||
|
||
from fides.api.schemas.base_class import FidesSchema | ||
from fides.api.schemas.storage.storage import StorageType | ||
|
||
|
||
class ExternalStorageMetadata(FidesSchema): | ||
"""Metadata for externally stored encrypted data.""" | ||
|
||
storage_type: StorageType | ||
file_key: str = Field(description="Path/key of the file in external storage") | ||
filesize: int = Field(description="Size of the stored file in bytes", ge=0) | ||
storage_key: Optional[str] = Field( | ||
default=None, description="Storage configuration key used" | ||
) | ||
|
||
class Config: | ||
use_enum_values = True |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.