Skip to content

Commit 6d4f55c

Browse files
committed
divided redis message format into different classes and added arguments for choosing format
1 parent f5f3a13 commit 6d4f55c

File tree

9 files changed

+264
-82
lines changed

9 files changed

+264
-82
lines changed

faststream/redis/broker/broker.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from faststream.exceptions import NOT_CONNECTED_YET
3333
from faststream.redis.broker.logging import RedisLoggingBroker
3434
from faststream.redis.broker.registrator import RedisRegistrator
35+
from faststream.redis.parser import JSONMessageFormat, MessageFormat
3536
from faststream.redis.publisher.producer import RedisFastProducer
3637
from faststream.redis.security import parse_security
3738
from faststream.types import EMPTY
@@ -139,6 +140,10 @@ def __init__(
139140
Sequence["BrokerMiddleware[BaseMessage]"],
140141
Doc("Middlewares to apply to all broker publishers/subscribers."),
141142
] = (),
143+
message_format: Annotated[
144+
Type["MessageFormat"],
145+
Doc("What format to use when parsing messages"),
146+
] = JSONMessageFormat,
142147
# AsyncAPI args
143148
security: Annotated[
144149
Optional["BaseSecurity"],
@@ -202,6 +207,7 @@ def __init__(
202207
] = (),
203208
) -> None:
204209
self._producer = None
210+
self.message_format = message_format
205211

206212
if asyncapi_url is None:
207213
asyncapi_url = url
@@ -348,6 +354,7 @@ async def _connect( # type: ignore[override]
348354
connection=client,
349355
parser=self._parser,
350356
decoder=self._decoder,
357+
message_format=self.message_format,
351358
)
352359
return client
353360

faststream/redis/broker/registrator.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
1-
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Sequence, Union, cast
1+
from typing import (
2+
TYPE_CHECKING,
3+
Any,
4+
Dict,
5+
Iterable,
6+
Optional,
7+
Sequence,
8+
Type,
9+
Union,
10+
cast,
11+
)
212

313
from typing_extensions import Annotated, Doc, deprecated, override
414

515
from faststream.broker.core.abc import ABCBroker
616
from faststream.broker.utils import default_filter
717
from faststream.exceptions import SetupError
818
from faststream.redis.message import UnifyRedisDict
19+
from faststream.redis.parser import JSONMessageFormat, MessageFormat
920
from faststream.redis.publisher.asyncapi import AsyncAPIPublisher
1021
from faststream.redis.subscriber.asyncapi import AsyncAPISubscriber
1122
from faststream.redis.subscriber.factory import SubsciberType, create_subscriber
@@ -97,6 +108,10 @@ def subscriber( # type: ignore[override]
97108
"Whether to disable **FastStream** RPC and Reply To auto responses or not."
98109
),
99110
] = False,
111+
message_format: Annotated[
112+
Type["MessageFormat"],
113+
Doc("What format to use when parsing messages"),
114+
] = JSONMessageFormat,
100115
# AsyncAPI information
101116
title: Annotated[
102117
Optional[str],
@@ -122,6 +137,7 @@ def subscriber( # type: ignore[override]
122137
list=list,
123138
stream=stream,
124139
# subscriber args
140+
message_format=message_format,
125141
no_ack=no_ack,
126142
no_reply=no_reply,
127143
retry=retry,

faststream/redis/parser.py

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import enum
2-
import warnings
2+
from abc import ABC, abstractmethod
33
from struct import pack, unpack
44
from typing import (
55
TYPE_CHECKING,
@@ -92,18 +92,14 @@ def read_bytes(self) -> bytes:
9292
return self.data[self.offset :]
9393

9494

95-
class RawMessage:
95+
class MessageFormat(ABC):
9696
"""A class to represent a raw Redis message."""
9797

9898
__slots__ = (
9999
"data",
100100
"headers",
101101
)
102102

103-
IDENTITY_HEADER = (
104-
b"\x89BIN\x0d\x0a\x1a\x0a" # to avoid confusion with other formats
105-
)
106-
107103
def __init__(
108104
self,
109105
data: bytes,
@@ -120,7 +116,7 @@ def build(
120116
reply_to: Optional[str],
121117
headers: Optional["AnyDict"],
122118
correlation_id: str,
123-
) -> "RawMessage":
119+
) -> "MessageFormat":
124120
payload, content_type = encode_message(message)
125121

126122
headers_to_send = {
@@ -141,6 +137,25 @@ def build(
141137
headers=headers_to_send,
142138
)
143139

140+
@classmethod
141+
@abstractmethod
142+
def encode(
143+
cls,
144+
*,
145+
message: Union[Sequence["SendableMessage"], "SendableMessage"],
146+
reply_to: Optional[str],
147+
headers: Optional["AnyDict"],
148+
correlation_id: str,
149+
) -> bytes:
150+
raise NotImplementedError()
151+
152+
@classmethod
153+
@abstractmethod
154+
def parse(cls, data: bytes) -> Tuple[bytes, "AnyDict"]:
155+
raise NotImplementedError()
156+
157+
158+
class JSONMessageFormat(MessageFormat):
144159
@classmethod
145160
def encode(
146161
cls,
@@ -149,29 +164,54 @@ def encode(
149164
reply_to: Optional[str],
150165
headers: Optional["AnyDict"],
151166
correlation_id: str,
152-
to_json: bool = False,
153167
) -> bytes:
154168
msg = cls.build(
155169
message=message,
156170
reply_to=reply_to,
157171
headers=headers,
158172
correlation_id=correlation_id,
159173
)
160-
if to_json:
161-
warnings.warn(
162-
"JSON encoding deprecated in **FastStream 0.6.0**. "
163-
"Please don't use it unless it's necessary"
164-
"Format will be removed in **FastStream 0.7.0**.",
165-
DeprecationWarning,
166-
stacklevel=2,
167-
)
168-
return dump_json(
169-
{
170-
"data": msg.data,
171-
"headers": msg.headers,
172-
}
173-
)
174+
return dump_json(
175+
{
176+
"data": msg.data,
177+
"headers": msg.headers,
178+
}
179+
)
180+
181+
@classmethod
182+
def parse(cls, data: bytes) -> Tuple[bytes, "AnyDict"]:
183+
headers: AnyDict
184+
try:
185+
parsed_data = json_loads(data)
186+
data = parsed_data["data"].encode()
187+
headers = parsed_data["headers"]
188+
except Exception:
189+
# Raw Redis message format
190+
data = data
191+
headers = {}
192+
return data, headers
193+
194+
195+
class BinaryMessageFormatV1(MessageFormat):
196+
IDENTITY_HEADER = (
197+
b"\x89BIN\x0d\x0a\x1a\x0a" # to avoid confusion with other formats
198+
)
174199

200+
@classmethod
201+
def encode(
202+
cls,
203+
*,
204+
message: Union[Sequence["SendableMessage"], "SendableMessage"],
205+
reply_to: Optional[str],
206+
headers: Optional["AnyDict"],
207+
correlation_id: str,
208+
) -> bytes:
209+
msg = cls.build(
210+
message=message,
211+
reply_to=reply_to,
212+
headers=headers,
213+
correlation_id=correlation_id,
214+
)
175215
writer = BinaryWriter()
176216
writer.write(cls.IDENTITY_HEADER)
177217
writer.write_int(FastStreamMessageVersion.v1.value)
@@ -185,41 +225,26 @@ def encode(
185225
@classmethod
186226
def parse(cls, data: bytes) -> Tuple[bytes, "AnyDict"]:
187227
headers: AnyDict
188-
189-
# FastStream message format
190228
try:
191229
reader = BinaryReader(data)
230+
headers = {}
192231
magic_header = reader.read_until(len(cls.IDENTITY_HEADER))
193232
message_version = reader.read_int()
194233
if (
195234
magic_header == cls.IDENTITY_HEADER
196235
and message_version == FastStreamMessageVersion.v1.value
197236
):
198237
header_count = reader.read_int()
199-
headers = {}
200238
for _ in range(header_count):
201239
key = reader.read_string()
202240
value = reader.read_string()
203241
headers[key] = value
204242

205243
data = reader.read_bytes()
206-
else:
207-
# JSON message format
208-
parsed_data = json_loads(data)
209-
data = parsed_data["data"].decode()
210-
headers = parsed_data["headers"]
211-
warnings.warn(
212-
"JSON decoding deprecated in **FastStream 0.6.0**. "
213-
"Please don't use it unless it's necessary"
214-
"Format will be removed in **FastStream 0.7.0**.",
215-
DeprecationWarning,
216-
stacklevel=2,
217-
)
218244
except Exception:
219245
# Raw Redis message format
220246
data = data
221247
headers = {}
222-
223248
return data, headers
224249

225250

@@ -228,9 +253,11 @@ class SimpleParser:
228253

229254
def __init__(
230255
self,
256+
message_format: Type["MessageFormat"] = JSONMessageFormat,
231257
pattern: Optional["Pattern[str]"] = None,
232258
) -> None:
233259
self.pattern = pattern
260+
self.message_format = message_format
234261

235262
async def parse_message(
236263
self,
@@ -256,7 +283,7 @@ def _parse_data(
256283
self,
257284
message: Mapping[str, Any],
258285
) -> Tuple[bytes, "AnyDict", List["AnyDict"]]:
259-
return (*RawMessage.parse(message["data"]), [])
286+
return (*self.message_format.parse(message["data"]), [])
260287

261288
def get_path(self, message: Mapping[str, Any]) -> "AnyDict":
262289
if (
@@ -295,7 +322,7 @@ def _parse_data(
295322
batch_headers: List[AnyDict] = []
296323

297324
for x in message["data"]:
298-
msg_data, msg_headers = _decode_batch_body_item(x)
325+
msg_data, msg_headers = _decode_batch_body_item(x, self.message_format)
299326
body.append(msg_data)
300327
batch_headers.append(msg_headers)
301328

@@ -319,7 +346,7 @@ def _parse_data(
319346
cls, message: Mapping[str, Any]
320347
) -> Tuple[bytes, "AnyDict", List["AnyDict"]]:
321348
data = message["data"]
322-
return (*RawMessage.parse(data.get(bDATA_KEY) or dump_json(data)), [])
349+
return (*JSONMessageFormat.parse(data.get(bDATA_KEY) or dump_json(data)), [])
323350

324351

325352
class RedisBatchStreamParser(SimpleParser):
@@ -333,7 +360,9 @@ def _parse_data(
333360
batch_headers: List[AnyDict] = []
334361

335362
for x in message["data"]:
336-
msg_data, msg_headers = _decode_batch_body_item(x.get(bDATA_KEY, x))
363+
msg_data, msg_headers = _decode_batch_body_item(
364+
x.get(bDATA_KEY, x), self.message_format
365+
)
337366
body.append(msg_data)
338367
batch_headers.append(msg_headers)
339368

@@ -349,8 +378,10 @@ def _parse_data(
349378
)
350379

351380

352-
def _decode_batch_body_item(msg_content: bytes) -> Tuple[Any, "AnyDict"]:
353-
msg_body, headers = RawMessage.parse(msg_content)
381+
def _decode_batch_body_item(
382+
msg_content: bytes, message_format: Type["MessageFormat"]
383+
) -> Tuple[Any, "AnyDict"]:
384+
msg_body, headers = message_format.parse(msg_content)
354385
try:
355386
return json_loads(msg_body), headers
356387
except Exception:

faststream/redis/publisher/producer.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import TYPE_CHECKING, Any, Optional
1+
from typing import TYPE_CHECKING, Any, Optional, Type
22

33
import anyio
44
from typing_extensions import override
@@ -7,7 +7,7 @@
77
from faststream.broker.utils import resolve_custom_func
88
from faststream.exceptions import WRONG_PUBLISH_ARGS, SetupError
99
from faststream.redis.message import DATA_KEY
10-
from faststream.redis.parser import RawMessage, RedisPubSubParser
10+
from faststream.redis.parser import JSONMessageFormat, MessageFormat, RedisPubSubParser
1111
from faststream.redis.schemas import INCORRECT_SETUP_MSG
1212
from faststream.utils.functions import timeout_scope
1313
from faststream.utils.nuid import NUID
@@ -34,10 +34,12 @@ def __init__(
3434
connection: "Redis[bytes]",
3535
parser: Optional["CustomCallable"],
3636
decoder: Optional["CustomCallable"],
37+
message_format: Type["MessageFormat"] = JSONMessageFormat,
3738
) -> None:
3839
self._connection = connection
40+
self.message_format = message_format
3941

40-
default = RedisPubSubParser()
42+
default = RedisPubSubParser(message_format=message_format)
4143
self._parser = resolve_custom_func(
4244
parser,
4345
default.parse_message,
@@ -83,7 +85,7 @@ async def publish( # type: ignore[override]
8385
psub = self._connection.pubsub()
8486
await psub.subscribe(reply_to)
8587

86-
msg = RawMessage.encode(
88+
msg = self.message_format.encode(
8789
message=message,
8890
reply_to=reply_to,
8991
headers=headers,
@@ -154,7 +156,7 @@ async def request( # type: ignore[override]
154156
psub = self._connection.pubsub()
155157
await psub.subscribe(reply_to)
156158

157-
msg = RawMessage.encode(
159+
msg = self.message_format.encode(
158160
message=message,
159161
reply_to=reply_to,
160162
headers=headers,
@@ -204,7 +206,7 @@ async def publish_batch(
204206
pipeline: Optional["Pipeline[bytes]"] = None,
205207
) -> None:
206208
batch = (
207-
RawMessage.encode(
209+
self.message_format.encode(
208210
message=msg,
209211
correlation_id=correlation_id,
210212
reply_to=None,

0 commit comments

Comments
 (0)