Skip to content

Commit 18816bd

Browse files
feature #2091: AsyncAPI HTTP support (#2301)
* channels_async_routes * docs: generate API References * chore: polish PR * lint: fix CI * docs: generate API References * chore: fix 3.8 compatibility * lint: fix mypy --------- Co-authored-by: Pastukhov Nikita <[email protected]> Co-authored-by: Nikita Pastukhov <[email protected]> Co-authored-by: Lancetnik <[email protected]>
1 parent 301acdc commit 18816bd

File tree

12 files changed

+167
-27
lines changed

12 files changed

+167
-27
lines changed

docs/docs/SUMMARY.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ search:
299299
- [OperationBinding](api/faststream/asyncapi/schema/bindings/amqp/OperationBinding.md)
300300
- [Queue](api/faststream/asyncapi/schema/bindings/amqp/Queue.md)
301301
- [ServerBinding](api/faststream/asyncapi/schema/bindings/amqp/ServerBinding.md)
302+
- http
303+
- [OperationBinding](api/faststream/asyncapi/schema/bindings/http/OperationBinding.md)
302304
- kafka
303305
- [ChannelBinding](api/faststream/asyncapi/schema/bindings/kafka/ChannelBinding.md)
304306
- [OperationBinding](api/faststream/asyncapi/schema/bindings/kafka/OperationBinding.md)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
# 0.5 - API
3+
# 2 - Release
4+
# 3 - Contributing
5+
# 5 - Template Page
6+
# 10 - Default
7+
search:
8+
boost: 0.5
9+
---
10+
11+
::: faststream.asyncapi.schema.bindings.http.OperationBinding

faststream/asgi/app.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
from faststream._compat import HAS_UVICORN, uvicorn
1919
from faststream._internal.application import Application
20-
from faststream.asgi.factories import make_asyncapi_asgi
21-
from faststream.asgi.response import AsgiResponse
22-
from faststream.asgi.websocket import WebSocketClose
2320
from faststream.exceptions import INSTALL_UVICORN
2421
from faststream.log.logging import logger
2522

23+
from .factories import make_asyncapi_asgi
24+
from .response import AsgiResponse
25+
from .websocket import WebSocketClose
26+
2627
if TYPE_CHECKING:
27-
from faststream.asgi.types import ASGIApp, Receive, Scope, Send
2828
from faststream.asyncapi.schema import (
2929
Contact,
3030
ContactDict,
@@ -45,6 +45,8 @@
4545
SettingField,
4646
)
4747

48+
from .types import ASGIApp, Receive, Scope, Send
49+
4850

4951
def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]:
5052
if port := params.get("port"):

faststream/asgi/factories.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,47 @@
22
TYPE_CHECKING,
33
Any,
44
Optional,
5+
Sequence,
6+
Union,
57
)
68

7-
from faststream.asgi.handlers import get
8-
from faststream.asgi.response import AsgiResponse
99
from faststream.asyncapi import get_app_schema
1010
from faststream.asyncapi.site import (
1111
ASYNCAPI_CSS_DEFAULT_URL,
1212
ASYNCAPI_JS_DEFAULT_URL,
1313
get_asyncapi_html,
1414
)
1515

16+
from .handlers import get
17+
from .response import AsgiResponse
18+
1619
if TYPE_CHECKING:
17-
from faststream.asgi.types import ASGIApp, Scope
1820
from faststream.asyncapi.proto import AsyncAPIApplication
21+
from faststream.asyncapi.schema import Tag, TagDict
1922
from faststream.broker.core.usecase import BrokerUsecase
23+
from faststream.types import AnyDict
24+
25+
from .types import ASGIApp, Scope
2026

2127

2228
def make_ping_asgi(
2329
broker: "BrokerUsecase[Any, Any]",
2430
/,
2531
timeout: Optional[float] = None,
2632
include_in_schema: bool = True,
33+
description: Optional[str] = None,
34+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
35+
unique_id: Optional[str] = None,
2736
) -> "ASGIApp":
2837
healthy_response = AsgiResponse(b"", 204)
2938
unhealthy_response = AsgiResponse(b"", 500)
3039

31-
@get(include_in_schema=include_in_schema)
40+
@get(
41+
include_in_schema=include_in_schema,
42+
description=description,
43+
tags=tags,
44+
unique_id=unique_id,
45+
)
3246
async def ping(scope: "Scope") -> AsgiResponse:
3347
if await broker.ping(timeout):
3448
return healthy_response
@@ -51,10 +65,18 @@ def make_asyncapi_asgi(
5165
asyncapi_js_url: str = ASYNCAPI_JS_DEFAULT_URL,
5266
asyncapi_css_url: str = ASYNCAPI_CSS_DEFAULT_URL,
5367
include_in_schema: bool = True,
68+
description: Optional[str] = None,
69+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
70+
unique_id: Optional[str] = None,
5471
) -> "ASGIApp":
5572
cached_docs = None
5673

57-
@get(include_in_schema=include_in_schema)
74+
@get(
75+
include_in_schema=include_in_schema,
76+
description=description,
77+
tags=tags,
78+
unique_id=unique_id,
79+
)
5880
async def docs(scope: "Scope") -> AsgiResponse:
5981
nonlocal cached_docs
6082
if not cached_docs:

faststream/asgi/handlers.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
1-
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Union, overload
1+
from typing import (
2+
TYPE_CHECKING,
3+
Callable,
4+
Optional,
5+
Sequence,
6+
Union,
7+
overload,
8+
)
29

3-
from faststream.asgi.response import AsgiResponse
10+
from .response import AsgiResponse
411

512
if TYPE_CHECKING:
6-
from faststream.asgi.types import ASGIApp, Receive, Scope, Send, UserApp
13+
from faststream.asyncapi.schema import Tag, TagDict
14+
from faststream.types import AnyDict
15+
16+
from .types import ASGIApp, Receive, Scope, Send, UserApp
717

818

919
class HttpHandler:
@@ -14,11 +24,15 @@ def __init__(
1424
include_in_schema: bool = True,
1525
description: Optional[str] = None,
1626
methods: Optional[Sequence[str]] = None,
27+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
28+
unique_id: Optional[str] = None,
1729
):
1830
self.func = func
1931
self.methods = methods or ()
2032
self.include_in_schema = include_in_schema
2133
self.description = description or func.__doc__
34+
self.tags = tags
35+
self.unique_id = unique_id
2236

2337
async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
2438
if scope["method"] not in self.methods:
@@ -41,12 +55,16 @@ def __init__(
4155
*,
4256
include_in_schema: bool = True,
4357
description: Optional[str] = None,
58+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
59+
unique_id: Optional[str] = None,
4460
):
4561
super().__init__(
4662
func,
4763
include_in_schema=include_in_schema,
4864
description=description,
4965
methods=("GET", "HEAD"),
66+
tags=tags,
67+
unique_id=unique_id,
5068
)
5169

5270

@@ -56,6 +74,8 @@ def get(
5674
*,
5775
include_in_schema: bool = True,
5876
description: Optional[str] = None,
77+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
78+
unique_id: Optional[str] = None,
5979
) -> "ASGIApp": ...
6080

6181

@@ -65,6 +85,8 @@ def get(
6585
*,
6686
include_in_schema: bool = True,
6787
description: Optional[str] = None,
88+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
89+
unique_id: Optional[str] = None,
6890
) -> Callable[["UserApp"], "ASGIApp"]: ...
6991

7092

@@ -73,10 +95,16 @@ def get(
7395
*,
7496
include_in_schema: bool = True,
7597
description: Optional[str] = None,
98+
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
99+
unique_id: Optional[str] = None,
76100
) -> Union[Callable[["UserApp"], "ASGIApp"], "ASGIApp"]:
77101
def decorator(inner_func: "UserApp") -> "ASGIApp":
78102
return GetHandler(
79-
inner_func, include_in_schema=include_in_schema, description=description
103+
inner_func,
104+
include_in_schema=include_in_schema,
105+
description=description,
106+
tags=tags,
107+
unique_id=unique_id,
80108
)
81109

82110
if func is None:

faststream/asgi/response.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import TYPE_CHECKING, List, Mapping, Optional, Tuple
22

33
if TYPE_CHECKING:
4-
from faststream.asgi.types import Receive, Scope, Send
4+
from .types import Receive, Scope, Send
55

66

77
class AsgiResponse:

faststream/asgi/websocket.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import TYPE_CHECKING, Optional
22

33
if TYPE_CHECKING:
4-
from faststream.asgi.types import Receive, Scope, Send
4+
from .types import Receive, Scope, Send
55

66

77
class WebSocketClose:

faststream/asyncapi/generate.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
Components,
77
Info,
88
Message,
9+
Operation,
10+
OperationBinding,
911
Reference,
1012
Schema,
1113
Server,
1214
)
15+
from faststream.asyncapi.schema.bindings import http as http_bindings
1316
from faststream.constants import ContentTypes
1417

1518
if TYPE_CHECKING:
@@ -26,16 +29,16 @@ def get_app_schema(app: "AsyncAPIApplication") -> Schema:
2629
broker.setup()
2730

2831
servers = get_broker_server(broker)
32+
2933
channels = get_broker_channels(broker)
34+
for ch in channels.values():
35+
ch.servers = list(servers.keys())
3036

31-
# TODO: generate HTTP channels
32-
# asgi_routes = get_asgi_routes(app)
37+
channels.update(get_asgi_routes(app))
3338

3439
messages: Dict[str, Message] = {}
3540
payloads: Dict[str, Dict[str, Any]] = {}
3641
for channel_name, ch in channels.items():
37-
ch.servers = list(servers.keys())
38-
3942
if ch.subscribe is not None:
4043
m = ch.subscribe.message
4144

@@ -140,9 +143,7 @@ def get_broker_channels(
140143
return channels
141144

142145

143-
def get_asgi_routes(
144-
app: "AsyncAPIApplication",
145-
) -> Any:
146+
def get_asgi_routes(app: "AsyncAPIApplication") -> Dict[str, Channel]:
146147
"""Get the ASGI routes for an application."""
147148
# We should import this here due
148149
# ASGI > Application > asynciapi.proto
@@ -151,16 +152,29 @@ def get_asgi_routes(
151152
from faststream.asgi.handlers import HttpHandler
152153

153154
if not isinstance(app, AsgiFastStream):
154-
return None
155+
return {}
155156

157+
channels: Dict[str, Channel] = {}
156158
for route in app.routes:
157159
path, asgi_app = route
158160

159161
if isinstance(asgi_app, HttpHandler) and asgi_app.include_in_schema:
160-
# TODO: generate HTTP channel for handler
161-
pass
162+
channel = Channel(
163+
description=asgi_app.description,
164+
subscribe=Operation(
165+
tags=asgi_app.tags,
166+
operationId=asgi_app.unique_id,
167+
bindings=OperationBinding(
168+
http=http_bindings.OperationBinding(
169+
method=", ".join(asgi_app.methods)
170+
)
171+
),
172+
),
173+
)
162174

163-
return
175+
channels[path] = channel
176+
177+
return channels
164178

165179

166180
def _resolve_msg_payloads(
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
"""AsyncAPI HTTP bindings.
2+
3+
https://github.com/asyncapi/bindings/tree/master/http
4+
"""
5+
6+
from pydantic import BaseModel
7+
8+
9+
class OperationBinding(BaseModel):
10+
method: str
11+
bindingVersion: str = "0.1.0"

faststream/asyncapi/schema/bindings/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from faststream._compat import PYDANTIC_V2
66
from faststream.asyncapi.schema.bindings import amqp as amqp_bindings
7+
from faststream.asyncapi.schema.bindings import http as http_bindings
78
from faststream.asyncapi.schema.bindings import kafka as kafka_bindings
89
from faststream.asyncapi.schema.bindings import nats as nats_bindings
910
from faststream.asyncapi.schema.bindings import redis as redis_bindings
@@ -81,6 +82,7 @@ class OperationBinding(BaseModel):
8182
sqs: Optional[sqs_bindings.OperationBinding] = None
8283
nats: Optional[nats_bindings.OperationBinding] = None
8384
redis: Optional[redis_bindings.OperationBinding] = None
85+
http: Optional[http_bindings.OperationBinding] = None
8486

8587
if PYDANTIC_V2:
8688
model_config = {"extra": "allow"}

faststream/asyncapi/schema/operations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class Operation(BaseModel):
3535

3636
bindings: Optional[OperationBinding] = None
3737

38-
message: Union[Message, Reference]
38+
message: Union[Message, Reference, None] = None
3939

4040
security: Optional[Dict[str, List[str]]] = None
4141

tests/asyncapi/test_asgi.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from faststream.asgi import AsgiFastStream, get, make_ping_asgi
2+
from faststream.asyncapi.generate import get_app_schema
3+
from faststream.kafka import KafkaBroker
4+
5+
6+
def test_asgi():
7+
broker = KafkaBroker()
8+
9+
@get
10+
async def handler(): ...
11+
12+
@get(include_in_schema=False)
13+
async def handler2(): ...
14+
15+
app = AsgiFastStream(
16+
broker,
17+
asgi_routes=[
18+
(
19+
"/test",
20+
make_ping_asgi(
21+
broker,
22+
description="test description",
23+
tags=[{"name": "test"}],
24+
),
25+
),
26+
("/test2", handler),
27+
("/test3", handler2),
28+
],
29+
)
30+
31+
schema = get_app_schema(app).to_jsonable()
32+
33+
assert schema["channels"] == {
34+
"/test": {
35+
"description": "test description",
36+
"subscribe": {
37+
"bindings": {
38+
"http": {"method": "GET, HEAD", "bindingVersion": "0.1.0"}
39+
},
40+
"tags": [{"name": "test"}],
41+
},
42+
},
43+
"/test2": {
44+
"subscribe": {
45+
"bindings": {"http": {"method": "GET, HEAD", "bindingVersion": "0.1.0"}}
46+
}
47+
},
48+
}

0 commit comments

Comments
 (0)