Skip to content

feature #2091: AsyncAPI HTTP support #2301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ search:
- [OperationBinding](api/faststream/asyncapi/schema/bindings/amqp/OperationBinding.md)
- [Queue](api/faststream/asyncapi/schema/bindings/amqp/Queue.md)
- [ServerBinding](api/faststream/asyncapi/schema/bindings/amqp/ServerBinding.md)
- http
- [OperationBinding](api/faststream/asyncapi/schema/bindings/http/OperationBinding.md)
- kafka
- [ChannelBinding](api/faststream/asyncapi/schema/bindings/kafka/ChannelBinding.md)
- [OperationBinding](api/faststream/asyncapi/schema/bindings/kafka/OperationBinding.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.asyncapi.schema.bindings.http.OperationBinding
10 changes: 6 additions & 4 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

from faststream._compat import HAS_UVICORN, uvicorn
from faststream._internal.application import Application
from faststream.asgi.factories import make_asyncapi_asgi
from faststream.asgi.response import AsgiResponse
from faststream.asgi.websocket import WebSocketClose
from faststream.exceptions import INSTALL_UVICORN
from faststream.log.logging import logger

from .factories import make_asyncapi_asgi
from .response import AsgiResponse
from .websocket import WebSocketClose

if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Receive, Scope, Send
from faststream.asyncapi.schema import (
Contact,
ContactDict,
Expand All @@ -45,6 +45,8 @@
SettingField,
)

from .types import ASGIApp, Receive, Scope, Send


def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]:
if port := params.get("port"):
Expand Down
32 changes: 27 additions & 5 deletions faststream/asgi/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,47 @@
TYPE_CHECKING,
Any,
Optional,
Sequence,
Union,
)

from faststream.asgi.handlers import get
from faststream.asgi.response import AsgiResponse
from faststream.asyncapi import get_app_schema
from faststream.asyncapi.site import (
ASYNCAPI_CSS_DEFAULT_URL,
ASYNCAPI_JS_DEFAULT_URL,
get_asyncapi_html,
)

from .handlers import get
from .response import AsgiResponse

if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Scope
from faststream.asyncapi.proto import AsyncAPIApplication
from faststream.asyncapi.schema import Tag, TagDict
from faststream.broker.core.usecase import BrokerUsecase
from faststream.types import AnyDict

from .types import ASGIApp, Scope


def make_ping_asgi(
broker: "BrokerUsecase[Any, Any]",
/,
timeout: Optional[float] = None,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
) -> "ASGIApp":
healthy_response = AsgiResponse(b"", 204)
unhealthy_response = AsgiResponse(b"", 500)

@get(include_in_schema=include_in_schema)
@get(
include_in_schema=include_in_schema,
description=description,
tags=tags,
unique_id=unique_id,
)
async def ping(scope: "Scope") -> AsgiResponse:
if await broker.ping(timeout):
return healthy_response
Expand All @@ -51,10 +65,18 @@ def make_asyncapi_asgi(
asyncapi_js_url: str = ASYNCAPI_JS_DEFAULT_URL,
asyncapi_css_url: str = ASYNCAPI_CSS_DEFAULT_URL,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
) -> "ASGIApp":
cached_docs = None

@get(include_in_schema=include_in_schema)
@get(
include_in_schema=include_in_schema,
description=description,
tags=tags,
unique_id=unique_id,
)
async def docs(scope: "Scope") -> AsgiResponse:
nonlocal cached_docs
if not cached_docs:
Expand Down
36 changes: 32 additions & 4 deletions faststream/asgi/handlers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Union, overload
from typing import (
TYPE_CHECKING,
Callable,
Optional,
Sequence,
Union,
overload,
)

from faststream.asgi.response import AsgiResponse
from .response import AsgiResponse

if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Receive, Scope, Send, UserApp
from faststream.asyncapi.schema import Tag, TagDict
from faststream.types import AnyDict

from .types import ASGIApp, Receive, Scope, Send, UserApp


class HttpHandler:
Expand All @@ -14,11 +24,15 @@ def __init__(
include_in_schema: bool = True,
description: Optional[str] = None,
methods: Optional[Sequence[str]] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
):
self.func = func
self.methods = methods or ()
self.include_in_schema = include_in_schema
self.description = description or func.__doc__
self.tags = tags
self.unique_id = unique_id

async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
if scope["method"] not in self.methods:
Expand All @@ -41,12 +55,16 @@ def __init__(
*,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
):
super().__init__(
func,
include_in_schema=include_in_schema,
description=description,
methods=("GET", "HEAD"),
tags=tags,
unique_id=unique_id,
)


Expand All @@ -56,6 +74,8 @@ def get(
*,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
) -> "ASGIApp": ...


Expand All @@ -65,6 +85,8 @@ def get(
*,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
) -> Callable[["UserApp"], "ASGIApp"]: ...


Expand All @@ -73,10 +95,16 @@ def get(
*,
include_in_schema: bool = True,
description: Optional[str] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
unique_id: Optional[str] = None,
) -> Union[Callable[["UserApp"], "ASGIApp"], "ASGIApp"]:
def decorator(inner_func: "UserApp") -> "ASGIApp":
return GetHandler(
inner_func, include_in_schema=include_in_schema, description=description
inner_func,
include_in_schema=include_in_schema,
description=description,
tags=tags,
unique_id=unique_id,
)

if func is None:
Expand Down
2 changes: 1 addition & 1 deletion faststream/asgi/response.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, List, Mapping, Optional, Tuple

if TYPE_CHECKING:
from faststream.asgi.types import Receive, Scope, Send
from .types import Receive, Scope, Send


class AsgiResponse:
Expand Down
2 changes: 1 addition & 1 deletion faststream/asgi/websocket.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from faststream.asgi.types import Receive, Scope, Send
from .types import Receive, Scope, Send


class WebSocketClose:
Expand Down
36 changes: 25 additions & 11 deletions faststream/asyncapi/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
Components,
Info,
Message,
Operation,
OperationBinding,
Reference,
Schema,
Server,
)
from faststream.asyncapi.schema.bindings import http as http_bindings
from faststream.constants import ContentTypes

if TYPE_CHECKING:
Expand All @@ -26,16 +29,16 @@ def get_app_schema(app: "AsyncAPIApplication") -> Schema:
broker.setup()

servers = get_broker_server(broker)

channels = get_broker_channels(broker)
for ch in channels.values():
ch.servers = list(servers.keys())

# TODO: generate HTTP channels
# asgi_routes = get_asgi_routes(app)
channels.update(get_asgi_routes(app))

messages: Dict[str, Message] = {}
payloads: Dict[str, Dict[str, Any]] = {}
for channel_name, ch in channels.items():
ch.servers = list(servers.keys())

if ch.subscribe is not None:
m = ch.subscribe.message

Expand Down Expand Up @@ -140,9 +143,7 @@ def get_broker_channels(
return channels


def get_asgi_routes(
app: "AsyncAPIApplication",
) -> Any:
def get_asgi_routes(app: "AsyncAPIApplication") -> Dict[str, Channel]:
"""Get the ASGI routes for an application."""
# We should import this here due
# ASGI > Application > asynciapi.proto
Expand All @@ -151,16 +152,29 @@ def get_asgi_routes(
from faststream.asgi.handlers import HttpHandler

if not isinstance(app, AsgiFastStream):
return None
return {}

channels: Dict[str, Channel] = {}
for route in app.routes:
path, asgi_app = route

if isinstance(asgi_app, HttpHandler) and asgi_app.include_in_schema:
# TODO: generate HTTP channel for handler
pass
channel = Channel(
description=asgi_app.description,
subscribe=Operation(
tags=asgi_app.tags,
operationId=asgi_app.unique_id,
bindings=OperationBinding(
http=http_bindings.OperationBinding(
method=", ".join(asgi_app.methods)
)
),
),
)

return
channels[path] = channel

return channels


def _resolve_msg_payloads(
Expand Down
11 changes: 11 additions & 0 deletions faststream/asyncapi/schema/bindings/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""AsyncAPI HTTP bindings.

https://github.com/asyncapi/bindings/tree/master/http
"""

from pydantic import BaseModel


class OperationBinding(BaseModel):
method: str
bindingVersion: str = "0.1.0"
2 changes: 2 additions & 0 deletions faststream/asyncapi/schema/bindings/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from faststream._compat import PYDANTIC_V2
from faststream.asyncapi.schema.bindings import amqp as amqp_bindings
from faststream.asyncapi.schema.bindings import http as http_bindings
from faststream.asyncapi.schema.bindings import kafka as kafka_bindings
from faststream.asyncapi.schema.bindings import nats as nats_bindings
from faststream.asyncapi.schema.bindings import redis as redis_bindings
Expand Down Expand Up @@ -81,6 +82,7 @@ class OperationBinding(BaseModel):
sqs: Optional[sqs_bindings.OperationBinding] = None
nats: Optional[nats_bindings.OperationBinding] = None
redis: Optional[redis_bindings.OperationBinding] = None
http: Optional[http_bindings.OperationBinding] = None

if PYDANTIC_V2:
model_config = {"extra": "allow"}
Expand Down
2 changes: 1 addition & 1 deletion faststream/asyncapi/schema/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Operation(BaseModel):

bindings: Optional[OperationBinding] = None

message: Union[Message, Reference]
message: Union[Message, Reference, None] = None

security: Optional[Dict[str, List[str]]] = None

Expand Down
48 changes: 48 additions & 0 deletions tests/asyncapi/test_asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from faststream.asgi import AsgiFastStream, get, make_ping_asgi
from faststream.asyncapi.generate import get_app_schema
from faststream.kafka import KafkaBroker


def test_asgi():
broker = KafkaBroker()

@get
async def handler(): ...

@get(include_in_schema=False)
async def handler2(): ...

app = AsgiFastStream(
broker,
asgi_routes=[
(
"/test",
make_ping_asgi(
broker,
description="test description",
tags=[{"name": "test"}],
),
),
("/test2", handler),
("/test3", handler2),
],
)

schema = get_app_schema(app).to_jsonable()

assert schema["channels"] == {
"/test": {
"description": "test description",
"subscribe": {
"bindings": {
"http": {"method": "GET, HEAD", "bindingVersion": "0.1.0"}
},
"tags": [{"name": "test"}],
},
},
"/test2": {
"subscribe": {
"bindings": {"http": {"method": "GET, HEAD", "bindingVersion": "0.1.0"}}
}
},
}
Loading