Skip to content

Commit 15cbe6b

Browse files
fix: add pattern checking (#1590)
* fix: add pattern checking * feat: full Kafka Pattern support * chore: bump version --------- Co-authored-by: Nikita Pastukhov <[email protected]> Co-authored-by: Pastukhov Nikita <[email protected]>
1 parent 4d210e0 commit 15cbe6b

File tree

7 files changed

+152
-23
lines changed

7 files changed

+152
-23
lines changed

docs/docs/en/kafka/Subscriber/index.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,18 @@ The function decorated with the `#!python @broker.subscriber(...)` decorator wil
5757
The message will then be injected into the typed `msg` argument of the function, and its type will be used to parse the message.
5858

5959
In this example case, when the message is sent to a `#!python "hello_world"` topic, it will be parsed into a `HelloWorld` class, and the `on_hello_world` function will be called with the parsed class as the `msg` argument value.
60+
61+
### Pattern data access
62+
63+
You can also use pattern subscription feature to encode some data directly in the topic name. With **FastStream** you can easily access this data using the following code:
64+
65+
```python hl_lines="3 6"
66+
from faststream import Path
67+
68+
@broker.subscriber(pattern="logs.{level}")
69+
async def base_handler(
70+
body: str,
71+
level: str = Path(),
72+
):
73+
...
74+
```

faststream/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Simple and fast framework to create message brokers based microservices."""
22

3-
__version__ = "0.5.14"
3+
__version__ = "0.5.15"
44

55
SERVICE_NAME = f"faststream-{__version__}"
66

faststream/kafka/parser.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from faststream.utils.context.repository import context
66

77
if TYPE_CHECKING:
8+
from re import Pattern
9+
810
from aiokafka import ConsumerRecord
911

1012
from faststream.broker.message import StreamMessage
@@ -15,8 +17,13 @@
1517
class AioKafkaParser:
1618
"""A class to parse Kafka messages."""
1719

18-
def __init__(self, msg_class: Type[KafkaMessage]) -> None:
20+
def __init__(
21+
self,
22+
msg_class: Type[KafkaMessage],
23+
regex: Optional["Pattern[str]"],
24+
) -> None:
1925
self.msg_class = msg_class
26+
self.regex = regex
2027

2128
async def parse_message(
2229
self,
@@ -25,6 +32,7 @@ async def parse_message(
2532
"""Parses a Kafka message."""
2633
headers = {i: j.decode() for i, j in message.headers}
2734
handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")
35+
2836
return self.msg_class(
2937
body=message.value,
3038
headers=headers,
@@ -33,6 +41,7 @@ async def parse_message(
3341
message_id=f"{message.offset}-{message.timestamp}",
3442
correlation_id=headers.get("correlation_id", gen_cor_id()),
3543
raw_message=message,
44+
path=self.get_path(message.topic),
3645
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
3746
)
3847

@@ -43,6 +52,12 @@ async def decode_message(
4352
"""Decodes a message."""
4453
return decode_message(msg)
4554

55+
def get_path(self, topic: str) -> Dict[str, str]:
56+
if self.regex and (match := self.regex.match(topic)):
57+
return match.groupdict()
58+
else:
59+
return {}
60+
4661

4762
class AioKafkaBatchParser(AioKafkaParser):
4863
async def parse_message(
@@ -73,6 +88,7 @@ async def parse_message(
7388
message_id=f"{first.offset}-{last.offset}-{first.timestamp}",
7489
correlation_id=headers.get("correlation_id", gen_cor_id()),
7590
raw_message=message,
91+
path=self.get_path(first.topic),
7692
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
7793
)
7894

faststream/kafka/subscriber/usecase.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
)
2929
from faststream.kafka.message import KafkaAckableMessage, KafkaMessage
3030
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser
31+
from faststream.utils.path import compile_path
3132

3233
if TYPE_CHECKING:
3334
from aiokafka import AIOKafkaConsumer, ConsumerRecord
@@ -93,15 +94,16 @@ def __init__(
9394
self.partitions = partitions
9495
self.group_id = group_id
9596

96-
self.builder = None
97-
self.consumer = None
98-
self.task = None
97+
self._pattern = pattern
98+
self.__listener = listener
99+
self.__connection_args = connection_args
99100

100101
# Setup it later
101102
self.client_id = ""
102-
self.__pattern = pattern
103-
self.__listener = listener
104-
self.__connection_args = connection_args
103+
self.builder = None
104+
105+
self.consumer = None
106+
self.task = None
105107

106108
@override
107109
def setup( # type: ignore[override]
@@ -149,10 +151,10 @@ async def start(self) -> None:
149151
**self.__connection_args,
150152
)
151153

152-
if self.topics:
154+
if self.topics or self._pattern:
153155
consumer.subscribe(
154156
topics=self.topics,
155-
pattern=self.__pattern,
157+
pattern=self._pattern,
156158
listener=self.__listener,
157159
)
158160

@@ -229,8 +231,8 @@ def get_routing_hash(
229231

230232
@property
231233
def topic_names(self) -> List[str]:
232-
if self.__pattern:
233-
return [self.__pattern]
234+
if self._pattern:
235+
return [self._pattern]
234236
elif self.topics:
235237
return list(self.topics)
236238
else:
@@ -305,8 +307,19 @@ def __init__(
305307
description_: Optional[str],
306308
include_in_schema: bool,
307309
) -> None:
310+
if pattern:
311+
reg, pattern = compile_path(
312+
pattern,
313+
replace_symbol=".*",
314+
patch_regex=lambda x: x.replace(r"\*", ".*"),
315+
)
316+
317+
else:
318+
reg = None
319+
308320
parser = AioKafkaParser(
309-
msg_class=KafkaAckableMessage if is_manual else KafkaMessage
321+
msg_class=KafkaAckableMessage if is_manual else KafkaMessage,
322+
regex=reg,
310323
)
311324

312325
super().__init__(
@@ -365,8 +378,19 @@ def __init__(
365378
self.batch_timeout_ms = batch_timeout_ms
366379
self.max_records = max_records
367380

381+
if pattern:
382+
reg, pattern = compile_path(
383+
pattern,
384+
replace_symbol=".*",
385+
patch_regex=lambda x: x.replace(r"\*", ".*"),
386+
)
387+
388+
else:
389+
reg = None
390+
368391
parser = AioKafkaBatchParser(
369-
msg_class=KafkaAckableMessage if is_manual else KafkaMessage
392+
msg_class=KafkaAckableMessage if is_manual else KafkaMessage,
393+
regex=reg,
370394
)
371395

372396
super().__init__(

faststream/kafka/testing.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
from datetime import datetime
23
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
34
from unittest.mock import AsyncMock, MagicMock
@@ -16,6 +17,7 @@
1617
if TYPE_CHECKING:
1718
from faststream.broker.wrapper.call import HandlerCallWrapper
1819
from faststream.kafka.publisher.asyncapi import AsyncAPIPublisher
20+
from faststream.kafka.subscriber.usecase import LogicSubscriber
1921
from faststream.types import SendableMessage
2022

2123
__all__ = ("TestKafkaBroker",)
@@ -108,13 +110,7 @@ async def publish( # type: ignore[override]
108110
return_value = None
109111

110112
for handler in self.broker._subscribers.values(): # pragma: no branch
111-
if (
112-
any(
113-
p.topic == topic and (partition is None or p.partition == partition)
114-
for p in handler.partitions
115-
)
116-
or topic in handler.topics
117-
):
113+
if _is_handler_matches(handler, topic, partition):
118114
handle_value = await call_handler(
119115
handler=handler,
120116
message=[incoming]
@@ -141,7 +137,7 @@ async def publish_batch(
141137
) -> None:
142138
"""Publish a batch of messages to the Kafka broker."""
143139
for handler in self.broker._subscribers.values(): # pragma: no branch
144-
if topic in handler.topics:
140+
if _is_handler_matches(handler, topic, partition):
145141
messages = (
146142
build_message(
147143
message=message,
@@ -215,3 +211,18 @@ def _fake_connection(*args: Any, **kwargs: Any) -> AsyncMock:
215211
mock.subscribe = MagicMock
216212
mock.assign = MagicMock
217213
return mock
214+
215+
216+
def _is_handler_matches(
217+
handler: "LogicSubscriber[Any]",
218+
topic: str,
219+
partition: Optional[int],
220+
) -> bool:
221+
return bool(
222+
any(
223+
p.topic == topic and (partition is None or p.partition == partition)
224+
for p in handler.partitions
225+
)
226+
or topic in handler.topics
227+
or (handler._pattern and re.match(handler._pattern, topic))
228+
)

tests/brokers/kafka/test_consume.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,41 @@ class TestConsume(BrokerRealConsumeTestcase):
1616
def get_broker(self, apply_types: bool = False):
1717
return KafkaBroker(apply_types=apply_types)
1818

19+
@pytest.mark.asyncio()
20+
async def test_consume_by_pattern(
21+
self,
22+
queue: str,
23+
event: asyncio.Event,
24+
):
25+
consume_broker = self.get_broker()
26+
27+
@consume_broker.subscriber(queue)
28+
async def handler(msg):
29+
event.set()
30+
31+
pattern_event = asyncio.Event()
32+
33+
@consume_broker.subscriber(pattern=f"{queue[:-1]}*")
34+
async def pattern_handler(msg):
35+
pattern_event.set()
36+
37+
async with self.patch_broker(consume_broker) as br:
38+
await br.start()
39+
40+
await br.publish(1, topic=queue)
41+
42+
await asyncio.wait(
43+
(
44+
asyncio.create_task(br.publish(1, topic=queue)),
45+
asyncio.create_task(event.wait()),
46+
asyncio.create_task(pattern_event.wait()),
47+
),
48+
timeout=3,
49+
)
50+
51+
assert event.is_set()
52+
assert pattern_event.is_set()
53+
1954
@pytest.mark.asyncio()
2055
async def test_consume_batch(self, queue: str):
2156
consume_broker = self.get_broker()

tests/utils/context/test_path.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,35 @@
44
import pytest
55

66
from faststream import Path
7-
from tests.marks import require_aiopika, require_nats, require_redis
7+
from tests.marks import require_aiokafka, require_aiopika, require_nats, require_redis
8+
9+
10+
@pytest.mark.asyncio()
11+
@require_aiokafka
12+
async def test_aiokafka_path():
13+
from faststream.kafka import KafkaBroker, TestKafkaBroker
14+
15+
broker = KafkaBroker()
16+
17+
@broker.subscriber(pattern="in.{name}.{id}")
18+
async def h(
19+
name: str = Path(),
20+
id_: int = Path("id"),
21+
):
22+
assert name == "john"
23+
assert id_ == 1
24+
return 1
25+
26+
async with TestKafkaBroker(broker) as br:
27+
assert (
28+
await br.publish(
29+
"",
30+
"in.john.1",
31+
rpc=True,
32+
rpc_timeout=1.0,
33+
)
34+
== 1
35+
)
836

937

1038
@pytest.mark.asyncio()

0 commit comments

Comments
 (0)