Skip to content

Add configurable response key for get_multi method #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
114 changes: 114 additions & 0 deletions docs/usage/response_keys.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Response Key Configuration

FastCRUD allows you to customize the key used for list responses through the `multi_response_key` parameter. This guide demonstrates both default and custom configurations.

## Default Configuration

By default, FastCRUD uses `"data"` as the response key:

```python
from fastcrud import FastCRUD
from .models import MyModel
from .database import session as db

# Default initialization
crud = FastCRUD(MyModel)

# Get multiple items
result = await crud.get_multi(db, limit=2)
```

Response structure:
```json
{
"data": [
{"id": 1, "name": "Item 1"},
{"id": 2, "name": "Item 2"}
],
"total_count": 10,
"has_more": true,
"page": 1,
"items_per_page": 2
}
```

## Custom Response Key

You can customize the response key using `multi_response_key`:

```python
from fastcrud import FastCRUD
from .models import MyModel
from .database import session as db

# Custom response key initialization
crud = FastCRUD(MyModel, multi_response_key="items")

# Get multiple items
result = await crud.get_multi(db, limit=2)
```

Response structure:
```json
{
"items": [
{"id": 1, "name": "Item 1"},
{"id": 2, "name": "Item 2"}
],
"total_count": 10,
"has_more": true,
"page": 1,
"items_per_page": 2
}
```

## Using with EndpointCreator

When using `EndpointCreator`, the response key configuration is automatically inherited:

```python
from fastcrud import FastCRUD, EndpointCreator
from .models import MyModel
from .schemas import CreateSchema, UpdateSchema

# Custom response key
crud = FastCRUD(MyModel, multi_response_key="items")

endpoint_creator = EndpointCreator(
session=async_session,
model=MyModel,
create_schema=CreateSchema,
update_schema=UpdateSchema,
crud=crud # The response key setting is inherited
)
```

The API endpoints created will use the configured response key in their responses.

## Response Models

When using Pydantic response models with custom response keys, make sure to define them accordingly:

```python
from pydantic import BaseModel

# For default "data" key
class DefaultResponse(BaseModel):
data: list[YourSchema]
total_count: int
has_more: bool
page: int | None = None
items_per_page: int | None = None

# For custom "items" key
class CustomResponse(BaseModel):
items: list[YourSchema]
total_count: int
has_more: bool
page: int | None = None
items_per_page: int | None = None
```

!!! note
FastCRUD automatically handles the response model creation when using `EndpointCreator` or `crud_router`,
so manual response model definition is only needed for custom implementations.
6 changes: 4 additions & 2 deletions fastcrud/crud/fast_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,14 @@ def __init__(
is_deleted_column: str = "is_deleted",
deleted_at_column: str = "deleted_at",
updated_at_column: str = "updated_at",
multi_response_key: str = "data",
) -> None:
self.model = model
self.model_col_names = [col.key for col in model.__table__.columns]
self.is_deleted_column = is_deleted_column
self.deleted_at_column = deleted_at_column
self.updated_at_column = updated_at_column
self.multi_response_key = multi_response_key
self._primary_keys = _get_primary_keys(self.model)

def _get_sqlalchemy_filter(
Expand Down Expand Up @@ -1427,7 +1429,7 @@ async def get_multi(
result = await db.execute(stmt)
data = [dict(row) for row in result.mappings()]

response: dict[str, Any] = {"data": data}
response: dict[str, Any] = {self.multi_response_key: data}

if return_total_count:
total_count = await self.count(db=db, **kwargs)
Expand All @@ -1440,7 +1442,7 @@ async def get_multi(
)
try:
model_data = [schema_to_select(**row) for row in data]
response["data"] = model_data
response[self.multi_response_key] = model_data
except ValidationError as e:
raise ValueError(
f"Data validation error for schema {schema_to_select.__name__}: {e}"
Expand Down
1 change: 0 additions & 1 deletion fastcrud/endpoint/crud_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,6 @@ async def add_routes_to_router(self, ...):
# Example GET request: /mymodel/get_multi?id=1&name=example
```
"""

crud = crud or FastCRUD(
model=model,
is_deleted_column=is_deleted_column,
Expand Down
29 changes: 8 additions & 21 deletions fastcrud/endpoint/endpoint_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
_create_dynamic_filters,
_get_column_types,
)
from ..paginated.schemas import create_list_response, create_paginated_response


class EndpointCreator:
Expand Down Expand Up @@ -297,29 +298,14 @@ def __init__(
self.column_types = _get_column_types(model)

if select_schema is not None:
self.list_response_model: Optional[Type[ListResponse[Any]]] = type(
"DynamicListResponse",
(ListResponse[BaseModel],),
{"__annotations__": {"data": list[select_schema]}}, # type: ignore
)
self.paginated_response_model: Optional[
Type[PaginatedListResponse[Any]]
] = type(
"DynamicPaginatedResponse",
(PaginatedListResponse[BaseModel],),
{
"__annotations__": {
"data": list[select_schema], # type: ignore
"total_count": int,
"has_more": bool,
"page": Optional[int],
"items_per_page": Optional[int],
}
},
response_key = getattr(self.crud, "multi_response_key", "data")
self.list_response_model = create_list_response(select_schema, response_key)
self.paginated_response_model = create_paginated_response(
select_schema, response_key
)
else:
self.list_response_model = None
self.paginated_response_model = None
self.list_response_model = None # type: ignore
self.paginated_response_model = None # type: ignore

def _validate_filter_config(self, filter_config: FilterConfig) -> None:
model_columns = self.crud.model_col_names
Expand Down Expand Up @@ -441,6 +427,7 @@ async def endpoint(
crud_data=crud_data,
page=page, # type: ignore
items_per_page=items_per_page, # type: ignore
multi_response_key=self.crud.multi_response_key,
)

return crud_data # pragma: no cover
Expand Down
19 changes: 14 additions & 5 deletions fastcrud/paginated/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,34 @@


def paginated_response(
crud_data: dict, page: int, items_per_page: int
crud_data: dict,
page: int,
items_per_page: int,
multi_response_key: str = "data",
) -> dict[str, Any]:
"""Create a paginated response based on the provided data and pagination parameters.

Args:
crud_data: Data to be paginated, including the list of items and total count.
page: Current page number.
items_per_page: Number of items per page.
multi_response_key: Key to use for the items list in the response (defaults to "data").

Returns:
A structured paginated response dict containing the list of items, total count, pagination flags, and numbers.

Note:
The function does not actually paginate the data but formats the response to indicate pagination metadata.
"""
return {
"data": crud_data["data"],
"total_count": crud_data["total_count"],
"has_more": (page * items_per_page) < crud_data["total_count"],
items = crud_data.get(multi_response_key, [])
total_count = crud_data.get("total_count", 0)

response = {
multi_response_key: items,
"total_count": total_count,
"has_more": (page * items_per_page) < total_count,
"page": page,
"items_per_page": items_per_page,
}

return response
25 changes: 23 additions & 2 deletions fastcrud/paginated/schemas.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
from typing import Generic, TypeVar, Optional
from typing import Generic, TypeVar, Optional, Type

from pydantic import BaseModel
from pydantic import BaseModel, create_model

SchemaType = TypeVar("SchemaType", bound=BaseModel)


def create_list_response(
schema: Type[SchemaType], response_key: str = "data"
) -> Type[BaseModel]:
"""Creates a dynamic ListResponse model with the specified response key."""
return create_model("DynamicListResponse", **{response_key: (list[schema], ...)}) # type: ignore


def create_paginated_response(
schema: Type[SchemaType], response_key: str = "data"
) -> Type[BaseModel]:
"""Creates a dynamic PaginatedResponse model with the specified response key."""
fields = {
response_key: (list[schema], ...), # type: ignore
"total_count": (int, ...),
"has_more": (bool, ...),
"page": (Optional[int], None),
"items_per_page": (Optional[int], None),
}
return create_model("DynamicPaginatedResponse", **fields) # type: ignore


class ListResponse(BaseModel, Generic[SchemaType]):
data: list[SchemaType]

Expand Down
40 changes: 40 additions & 0 deletions tests/sqlalchemy/crud/test_get_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,43 @@ async def test_get_multi_not_filtering(async_session, test_model):
)
# Should only return records with tier_id = 4 or tier_id = 6
assert all(item["tier_id"] in [4, 6] for item in result["data"])


@pytest.mark.asyncio
async def test_get_multi_basic_changed_multi_response_key(
async_session, test_model, test_data
):
for item in test_data:
async_session.add(test_model(**item))
await async_session.commit()

total_count_query = await async_session.execute(
select(func.count()).select_from(test_model)
)
total_count = total_count_query.scalar()

new_multi_response_key = "items"
crud = FastCRUD(test_model, multi_response_key=new_multi_response_key)
result = await crud.get_multi(async_session)

assert len(result[new_multi_response_key]) <= 100
assert result["total_count"] == total_count


@pytest.mark.asyncio
async def test_get_multi_return_model_changed_multi_response_key(
async_session, test_model, test_data, create_schema
):
for item in test_data:
async_session.add(test_model(**item))
await async_session.commit()

new_multi_response_key = "items"
crud = FastCRUD(test_model, multi_response_key=new_multi_response_key)
result = await crud.get_multi(
async_session, return_as_model=True, schema_to_select=create_schema
)

assert all(
isinstance(item, create_schema) for item in result[new_multi_response_key]
)
Empty file.
Loading