Skip to content

Commit 1862620

Browse files
authored
add typing to aiokafka/protocol/* (#999)
* add typing to aiokafka/protocol/* * fix review * fix VarInt64 * fix review tuple -> list * fix review * fix review * move ALL_TOPICS/NO_TOPICS to docs * remove default values from Message() * fix checking abstractproperty in test * fix review * fix review (from docstrings to comments) * fix: collections.abc.Sequence -> typing.Sequence * fix review: Message * add FIXME * fix review: Message * use NotImplemented instead of False
1 parent 2bba153 commit 1862620

File tree

11 files changed

+415
-219
lines changed

11 files changed

+415
-219
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ FORMATTED_AREAS=\
1111
aiokafka/helpers.py \
1212
aiokafka/structs.py \
1313
aiokafka/util.py \
14+
aiokafka/protocol/ \
1415
tests/test_codec.py \
1516
tests/test_helpers.py
1617

aiokafka/protocol/abstract.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
import abc
2+
from io import BytesIO
3+
from typing import Generic, TypeVar
24

5+
T = TypeVar("T")
36

4-
class AbstractType(metaclass=abc.ABCMeta):
7+
8+
class AbstractType(Generic[T], metaclass=abc.ABCMeta):
59
@classmethod
610
@abc.abstractmethod
7-
def encode(cls, value): ...
11+
def encode(cls, value: T) -> bytes: ...
812

913
@classmethod
1014
@abc.abstractmethod
11-
def decode(cls, data): ...
15+
def decode(cls, data: BytesIO) -> T: ...
1216

1317
@classmethod
14-
def repr(cls, value):
18+
def repr(cls, value: T) -> str:
1519
return repr(value)

aiokafka/protocol/admin.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Dict, Iterable, Optional, Tuple
2+
13
from .api import Request, Response
24
from .types import (
35
Array,
@@ -429,8 +431,8 @@ class DescribeGroupsResponse_v3(Response):
429431
("member_assignment", Bytes),
430432
),
431433
),
434+
("authorized_operations", Int32),
432435
),
433-
("authorized_operations", Int32),
434436
),
435437
)
436438

@@ -1119,7 +1121,7 @@ class DeleteGroupsRequest_v1(Request):
11191121
DeleteGroupsResponse = [DeleteGroupsResponse_v0, DeleteGroupsResponse_v1]
11201122

11211123

1122-
class DescribeClientQuotasResponse_v0(Request):
1124+
class DescribeClientQuotasResponse_v0(Response):
11231125
API_KEY = 48
11241126
API_VERSION = 0
11251127
SCHEMA = Schema(
@@ -1385,7 +1387,12 @@ class DeleteRecordsRequest_v2(Request):
13851387
("tags", TaggedFields),
13861388
)
13871389

1388-
def __init__(self, topics, timeout_ms, tags=None):
1390+
def __init__(
1391+
self,
1392+
topics: Iterable[Tuple[str, Iterable[Tuple[int, int]]]],
1393+
timeout_ms: int,
1394+
tags: Optional[Dict[int, bytes]] = None,
1395+
) -> None:
13891396
super().__init__(
13901397
[
13911398
(

aiokafka/protocol/api.py

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
from __future__ import annotations
2+
13
import abc
4+
from io import BytesIO
5+
from typing import Any, ClassVar, Dict, Optional, Type, Union
26

37
from .struct import Struct
48
from .types import Array, Int16, Int32, Schema, String, TaggedFields
@@ -12,7 +16,9 @@ class RequestHeader_v0(Struct):
1216
("client_id", String("utf-8")),
1317
)
1418

15-
def __init__(self, request, correlation_id=0, client_id="aiokafka"):
19+
def __init__(
20+
self, request: Request, correlation_id: int = 0, client_id: str = "aiokafka"
21+
) -> None:
1622
super().__init__(
1723
request.API_KEY, request.API_VERSION, correlation_id, client_id
1824
)
@@ -28,7 +34,13 @@ class RequestHeader_v1(Struct):
2834
("tags", TaggedFields),
2935
)
3036

31-
def __init__(self, request, correlation_id=0, client_id="aiokafka", tags=None):
37+
def __init__(
38+
self,
39+
request: Request,
40+
correlation_id: int = 0,
41+
client_id: str = "aiokafka",
42+
tags: Optional[Dict[int, bytes]] = None,
43+
):
3244
super().__init__(
3345
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
3446
)
@@ -48,32 +60,38 @@ class ResponseHeader_v1(Struct):
4860

4961

5062
class Request(Struct, metaclass=abc.ABCMeta):
51-
FLEXIBLE_VERSION = False
63+
FLEXIBLE_VERSION: ClassVar[bool] = False
5264

53-
@abc.abstractproperty
54-
def API_KEY(self):
65+
@property
66+
@abc.abstractmethod
67+
def API_KEY(self) -> int:
5568
"""Integer identifier for api request"""
5669

57-
@abc.abstractproperty
58-
def API_VERSION(self):
70+
@property
71+
@abc.abstractmethod
72+
def API_VERSION(self) -> int:
5973
"""Integer of api request version"""
6074

61-
@abc.abstractproperty
62-
def SCHEMA(self):
63-
"""An instance of Schema() representing the request structure"""
64-
65-
@abc.abstractproperty
66-
def RESPONSE_TYPE(self):
75+
@property
76+
@abc.abstractmethod
77+
def RESPONSE_TYPE(self) -> Type[Response]:
6778
"""The Response class associated with the api request"""
6879

69-
def expect_response(self):
80+
@property
81+
@abc.abstractmethod
82+
def SCHEMA(self) -> Schema:
83+
"""An instance of Schema() representing the request structure"""
84+
85+
def expect_response(self) -> bool:
7086
"""Override this method if an api request does not always generate a response"""
7187
return True
7288

73-
def to_object(self):
89+
def to_object(self) -> Dict[str, Any]:
7490
return _to_object(self.SCHEMA, self)
7591

76-
def build_request_header(self, correlation_id, client_id):
92+
def build_request_header(
93+
self, correlation_id: int, client_id: str
94+
) -> Union[RequestHeader_v0, RequestHeader_v1]:
7795
if self.FLEXIBLE_VERSION:
7896
return RequestHeader_v1(
7997
self, correlation_id=correlation_id, client_id=client_id
@@ -82,31 +100,36 @@ def build_request_header(self, correlation_id, client_id):
82100
self, correlation_id=correlation_id, client_id=client_id
83101
)
84102

85-
def parse_response_header(self, read_buffer):
103+
def parse_response_header(
104+
self, read_buffer: Union[BytesIO, bytes]
105+
) -> Union[ResponseHeader_v0, ResponseHeader_v1]:
86106
if self.FLEXIBLE_VERSION:
87107
return ResponseHeader_v1.decode(read_buffer)
88108
return ResponseHeader_v0.decode(read_buffer)
89109

90110

91111
class Response(Struct, metaclass=abc.ABCMeta):
92-
@abc.abstractproperty
93-
def API_KEY(self):
112+
@property
113+
@abc.abstractmethod
114+
def API_KEY(self) -> int:
94115
"""Integer identifier for api request/response"""
95116

96-
@abc.abstractproperty
97-
def API_VERSION(self):
117+
@property
118+
@abc.abstractmethod
119+
def API_VERSION(self) -> int:
98120
"""Integer of api request/response version"""
99121

100-
@abc.abstractproperty
101-
def SCHEMA(self):
122+
@property
123+
@abc.abstractmethod
124+
def SCHEMA(self) -> Schema:
102125
"""An instance of Schema() representing the response structure"""
103126

104-
def to_object(self):
127+
def to_object(self) -> Dict[str, Any]:
105128
return _to_object(self.SCHEMA, self)
106129

107130

108-
def _to_object(schema, data):
109-
obj = {}
131+
def _to_object(schema: Schema, data: Union[Struct, Dict[int, Any]]) -> Dict[str, Any]:
132+
obj: Dict[str, Any] = {}
110133
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
111134
if isinstance(data, Struct):
112135
val = data.get_item(name)
@@ -116,7 +139,7 @@ def _to_object(schema, data):
116139
if isinstance(_type, Schema):
117140
obj[name] = _to_object(_type, val)
118141
elif isinstance(_type, Array):
119-
if isinstance(_type.array_of, (Array, Schema)):
142+
if isinstance(_type.array_of, Schema):
120143
obj[name] = [_to_object(_type.array_of, x) for x in val]
121144
else:
122145
obj[name] = val

aiokafka/protocol/fetch.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ class FetchRequest_v7(Request):
376376
),
377377
(
378378
"forgotten_topics_data",
379-
Array(("topic", String), ("partitions", Array(Int32))),
379+
Array(("topic", String("utf-8")), ("partitions", Array(Int32))),
380380
),
381381
)
382382

@@ -428,7 +428,7 @@ class FetchRequest_v9(Request):
428428
(
429429
"forgotten_topics_data",
430430
Array(
431-
("topic", String),
431+
("topic", String("utf-8")),
432432
("partitions", Array(Int32)),
433433
),
434434
),
@@ -480,7 +480,7 @@ class FetchRequest_v11(Request):
480480
),
481481
(
482482
"forgotten_topics_data",
483-
Array(("topic", String), ("partitions", Array(Int32))),
483+
Array(("topic", String("utf-8")), ("partitions", Array(Int32))),
484484
),
485485
("rack_id", String("utf-8")),
486486
)

0 commit comments

Comments
 (0)