Skip to content

Commit 331f6d5

Browse files
authored
Update ruff to 0.3.2, apply more linting rules (#986)
* Update ruff to 0.3.0 * Apply pyupgrade rules * Apply flake8-2020, flake8-async * Partially apply flake8-bandit, flake8-blind-except, flake8-bugbear * Apply flake8-builtins, flake8-comprehensions * Apply flake8-future-annotations and other flake rules * Apply flake8-logging-format and others * Apply flake8-no-pep420, flake8-pie * Apply flake8-print * Ignores for the rest of flake8-bandit * Ignore for rest BLE; remove pylint ignores * Fix problems found by flake8-bugbear * Apply flake8-return * Apply flake8-simplify * Apply pygrep-hooks * Apply Pylint rules * Apply tryceratops * Apply Perflint * Apply flake8-logging, Ruff-specific * Update ruff to 0.3.2 * Fix error found by github-advanced-security * Return back to runtime-valid annotations * Modern way to use metaclass * Fix bootstrap_servers annotation * Remove broken __hash__ method
1 parent d712413 commit 331f6d5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+765
-857
lines changed

aiokafka/__init__.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.10.0" # noqa
1+
__version__ = "0.10.0"
22

33
from .abc import ConsumerRebalanceListener
44
from .client import AIOKafkaClient
@@ -16,6 +16,7 @@
1616
# Clients API
1717
"AIOKafkaProducer",
1818
"AIOKafkaConsumer",
19+
"AIOKafkaClient",
1920
# ABC's
2021
"ConsumerRebalanceListener",
2122
# Errors
@@ -27,5 +28,3 @@
2728
"OffsetAndTimestamp",
2829
"OffsetAndMetadata",
2930
]
30-
31-
AIOKafkaClient

aiokafka/abc.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def on_partitions_revoked(self, revoked):
6363
revoked (list(TopicPartition)): the partitions that were assigned
6464
to the consumer on the last rebalance
6565
"""
66-
pass
6766

6867
@abc.abstractmethod
6968
def on_partitions_assigned(self, assigned):
@@ -83,7 +82,6 @@ def on_partitions_assigned(self, assigned):
8382
assigned (list(TopicPartition)): the partitions assigned to the
8483
consumer (may include partitions that were previously assigned)
8584
"""
86-
pass
8785

8886

8987
class AbstractTokenProvider(abc.ABC):
@@ -104,14 +102,11 @@ class AbstractTokenProvider(abc.ABC):
104102
https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html
105103
"""
106104

107-
def __init__(self, **config):
108-
pass
109-
110105
@abc.abstractmethod
111106
async def token(self):
112107
"""
113108
An async callback returning a :class:`str` ID/Access Token to be sent to
114-
the Kafka client. In case where a synchoronous callback is needed,
109+
the Kafka client. In case where a synchronous callback is needed,
115110
implementations like following can be used:
116111
117112
.. code-block:: python
@@ -124,9 +119,8 @@ async def token(self):
124119
None, self._token)
125120
126121
def _token(self):
127-
# The actual synchoronous token callback.
122+
# The actual synchronous token callback.
128123
"""
129-
pass
130124

131125
def extensions(self):
132126
"""

aiokafka/admin/client.py

+13-22
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from collections import defaultdict
44
from ssl import SSLContext
5-
from typing import Any, Dict, List, Optional, Tuple
5+
from typing import Any, Dict, List, Optional, Tuple, Union
66

77
from aiokafka import __version__
88
from aiokafka.client import AIOKafkaClient
@@ -85,7 +85,7 @@ def __init__(
8585
self,
8686
*,
8787
loop=None,
88-
bootstrap_servers: str = "localhost",
88+
bootstrap_servers: Union[str, List[str]] = "localhost",
8989
client_id: str = "aiokafka-" + __version__,
9090
request_timeout_ms: int = 40000,
9191
connections_max_idle_ms: int = 540000,
@@ -171,16 +171,15 @@ def _matching_api_version(self, operation: List[Request]) -> int:
171171
api_key = operation[0].API_KEY
172172
if not self._version_info or api_key not in self._version_info:
173173
raise IncompatibleBrokerVersion(
174-
"Kafka broker does not support the '{}' Kafka protocol.".format(
175-
operation[0].__name__
176-
)
174+
f"Kafka broker does not support the '{operation[0].__name__}' "
175+
"Kafka protocol."
177176
)
178177
min_version, max_version = self._version_info[api_key]
179178
version = min(len(operation) - 1, max_version)
180179
if version < min_version:
181180
raise IncompatibleBrokerVersion(
182-
"No version of the '{}' Kafka protocol is supported by "
183-
"both the client and broker.".format(operation[0].__name__)
181+
f"No version of the '{operation[0].__name__}' Kafka protocol "
182+
"is supported by both the client and broker."
184183
)
185184
return version
186185

@@ -190,14 +189,8 @@ def _convert_new_topic_request(new_topic):
190189
new_topic.name,
191190
new_topic.num_partitions,
192191
new_topic.replication_factor,
193-
[
194-
(partition_id, replicas)
195-
for partition_id, replicas in new_topic.replica_assignments.items()
196-
],
197-
[
198-
(config_key, config_value)
199-
for config_key, config_value in new_topic.topic_configs.items()
200-
],
192+
list(new_topic.replica_assignments.items()),
193+
list(new_topic.topic_configs.items()),
201194
)
202195

203196
async def create_topics(
@@ -223,9 +216,7 @@ async def create_topics(
223216
if validate_only:
224217
raise IncompatibleBrokerVersion(
225218
"validate_only requires CreateTopicsRequest >= v1, "
226-
"which is not supported by Kafka {}.".format(
227-
self._client.api_version
228-
)
219+
f"which is not supported by Kafka {self._client.api_version}."
229220
)
230221
request = CreateTopicsRequest[version](
231222
create_topic_requests=topics,
@@ -239,8 +230,8 @@ async def create_topics(
239230
)
240231
else:
241232
raise NotImplementedError(
242-
"Support for CreateTopics v{} has not yet been added "
243-
"to AIOKafkaAdminClient.".format(version)
233+
f"Support for CreateTopics v{version} has not yet been added "
234+
"to AIOKafkaAdminClient."
244235
)
245236
response = await self._client.send(self._client.get_random_node(), request)
246237
return response
@@ -317,7 +308,7 @@ async def describe_configs(
317308
if version == 0 and include_synonyms:
318309
raise IncompatibleBrokerVersion(
319310
"include_synonyms requires DescribeConfigsRequest >= v1,"
320-
" which is not supported by Kafka {}.".format(self._client.api_version)
311+
f" which is not supported by Kafka {self._client.api_version}."
321312
)
322313
broker_res, topic_res = self._convert_config_resources(
323314
config_resources,
@@ -462,7 +453,7 @@ async def describe_consumer_groups(
462453
raise IncompatibleBrokerVersion(
463454
"include_authorized_operations requests "
464455
"DescribeGroupsRequest >= v3, which is not "
465-
"supported by Kafka {}".format(version)
456+
f"supported by Kafka {version}"
466457
)
467458
req_class = DescribeGroupsRequest[version]
468459
futures = []

aiokafka/client.py

+19-25
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import contextlib
23
import logging
34
import random
45
import time
@@ -188,16 +189,14 @@ def hosts(self):
188189
async def close(self):
189190
if self._sync_task:
190191
self._sync_task.cancel()
191-
try:
192+
with contextlib.suppress(asyncio.CancelledError):
192193
await self._sync_task
193-
except asyncio.CancelledError:
194-
pass
195194
self._sync_task = None
196195
# Be careful to wait for graceful closure of all connections, so we
197196
# process all pending buffers.
198-
futs = []
199-
for conn in self._conns.values():
200-
futs.append(conn.close(reason=CloseReason.SHUTDOWN))
197+
futs = [
198+
conn.close(reason=CloseReason.SHUTDOWN) for conn in self._conns.values()
199+
]
201200
if futs:
202201
await asyncio.gather(*futs)
203202

@@ -231,7 +230,7 @@ async def bootstrap(self):
231230
sasl_mechanism=self._sasl_mechanism,
232231
sasl_plain_username=self._sasl_plain_username,
233232
sasl_plain_password=self._sasl_plain_password,
234-
sasl_kerberos_service_name=self._sasl_kerberos_service_name, # noqa: E501
233+
sasl_kerberos_service_name=self._sasl_kerberos_service_name,
235234
sasl_kerberos_domain_name=self._sasl_kerberos_domain_name,
236235
sasl_oauth_token_provider=self._sasl_oauth_token_provider,
237236
version_hint=version_hint,
@@ -370,7 +369,7 @@ def force_metadata_update(self):
370369
if not self._md_update_waiter.done():
371370
self._md_update_waiter.set_result(None)
372371
self._md_update_fut = self._loop.create_future()
373-
# Metadata will be updated in the background by syncronizer
372+
# Metadata will be updated in the background by synchronizer
374373
return asyncio.shield(self._md_update_fut)
375374

376375
async def fetch_all_metadata(self):
@@ -413,10 +412,7 @@ def _on_connection_closed(self, conn, reason):
413412
"""Callback called when connection is closed"""
414413
# Connection failures imply that our metadata is stale, so let's
415414
# refresh
416-
if (
417-
reason == CloseReason.CONNECTION_BROKEN
418-
or reason == CloseReason.CONNECTION_TIMEOUT
419-
):
415+
if reason in [CloseReason.CONNECTION_BROKEN, CloseReason.CONNECTION_TIMEOUT]:
420416
self.force_metadata_update()
421417

422418
async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=False):
@@ -471,7 +467,7 @@ async def _get_conn(self, node_id, *, group=ConnectionGroup.DEFAULT, no_hint=Fal
471467
sasl_mechanism=self._sasl_mechanism,
472468
sasl_plain_username=self._sasl_plain_username,
473469
sasl_plain_password=self._sasl_plain_password,
474-
sasl_kerberos_service_name=self._sasl_kerberos_service_name, # noqa: E501
470+
sasl_kerberos_service_name=self._sasl_kerberos_service_name,
475471
sasl_kerberos_domain_name=self._sasl_kerberos_domain_name,
476472
sasl_oauth_token_provider=self._sasl_oauth_token_provider,
477473
version_hint=version_hint,
@@ -511,7 +507,7 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
511507
if not (await self.ready(node_id, group=group)):
512508
raise NodeNotReadyError(
513509
"Attempt to send a request to node"
514-
" which is not ready (node id {}).".format(node_id)
510+
f" which is not ready (node id {node_id})."
515511
)
516512

517513
# Every request gets a response, except one special case:
@@ -524,10 +520,10 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
524520
)
525521
try:
526522
result = await future
527-
except asyncio.TimeoutError:
523+
except asyncio.TimeoutError as exc:
528524
# close connection so it is renewed in next request
529525
self._conns[(node_id, group)].close(reason=CloseReason.CONNECTION_TIMEOUT)
530-
raise RequestTimedOutError()
526+
raise RequestTimedOutError() from exc
531527
else:
532528
return result
533529

@@ -536,14 +532,14 @@ async def check_version(self, node_id=None):
536532
if node_id is None:
537533
default_group_conns = [
538534
n_id
539-
for (n_id, group) in self._conns.keys()
535+
for (n_id, group) in self._conns
540536
if group == ConnectionGroup.DEFAULT
541537
]
542538
if default_group_conns:
543539
node_id = default_group_conns[0]
544540
else:
545541
assert self.cluster.brokers(), "no brokers in metadata"
546-
node_id = list(self.cluster.brokers())[0].nodeId
542+
node_id = next(iter(self.cluster.brokers())).nodeId
547543

548544
from aiokafka.protocol.admin import ApiVersionRequest_v0, ListGroupsRequest_v0
549545
from aiokafka.protocol.commit import (
@@ -577,14 +573,12 @@ async def check_version(self, node_id=None):
577573
# so we send metadata request and wait response
578574
task = create_task(conn.send(request))
579575
await asyncio.wait([task], timeout=0.1)
580-
try:
576+
# metadata request can be cancelled in case
577+
# of invalid correlationIds order
578+
with contextlib.suppress(KafkaError):
581579
await conn.send(MetadataRequest_v0([]))
582-
except KafkaError:
583-
# metadata request can be cancelled in case
584-
# of invalid correlationIds order
585-
pass
586580
response = await task
587-
except KafkaError:
581+
except KafkaError: # noqa: PERF203
588582
continue
589583
else:
590584
# To avoid having a connection in undefined state
@@ -593,7 +587,7 @@ async def check_version(self, node_id=None):
593587
if isinstance(request, ApiVersionRequest_v0):
594588
# Starting from 0.10 kafka broker we determine version
595589
# by looking at ApiVersionResponse
596-
version = self._check_api_version_response(response)
590+
return self._check_api_version_response(response)
597591
return version
598592

599593
raise UnrecognizedBrokerVersion()

aiokafka/cluster.py

+12-12
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,20 @@ def available_partitions_for_topic(self, topic):
128128
"""
129129
if topic not in self._partitions:
130130
return None
131-
return set(
132-
[
133-
partition
134-
for partition, metadata in self._partitions[topic].items()
135-
if metadata.leader != -1
136-
]
137-
)
131+
return {
132+
partition
133+
for partition, metadata in self._partitions[topic].items()
134+
if metadata.leader != -1
135+
}
138136

139137
def leader_for_partition(self, partition):
140138
"""Return node_id of leader, -1 unavailable, None if unknown."""
141139
if partition.topic not in self._partitions:
142140
return None
143-
elif partition.partition not in self._partitions[partition.topic]:
141+
partitions = self._partitions[partition.topic]
142+
if partition.partition not in partitions:
144143
return None
145-
return self._partitions[partition.topic][partition.partition].leader
144+
return partitions[partition.partition].leader
146145

147146
def partitions_for_broker(self, broker_id):
148147
"""Return TopicPartitions for which the broker is a leader.
@@ -222,7 +221,8 @@ def update_metadata(self, metadata):
222221
"""
223222
if not metadata.brokers:
224223
log.warning("No broker metadata found in MetadataResponse -- ignoring.")
225-
return self.failed_update(Errors.MetadataEmptyBrokerList(metadata))
224+
self.failed_update(Errors.MetadataEmptyBrokerList(metadata))
225+
return
226226

227227
_new_brokers = {}
228228
for broker in metadata.brokers:
@@ -342,11 +342,11 @@ def add_group_coordinator(self, group, response):
342342
if error_type is not Errors.NoError:
343343
log.error("GroupCoordinatorResponse error: %s", error_type)
344344
self._groups[group] = -1
345-
return
345+
return None
346346

347347
# Use a coordinator-specific node id so that group requests
348348
# get a dedicated connection
349-
node_id = "coordinator-{}".format(response.coordinator_id)
349+
node_id = f"coordinator-{response.coordinator_id}"
350350
coordinator = BrokerMetadata(node_id, response.host, response.port, None)
351351

352352
log.info("Group coordinator for %s is %s", group, coordinator)

aiokafka/codec.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
from __future__ import annotations
2-
31
import gzip
42
import io
53
import struct
4+
from typing import Optional
65

76
from typing_extensions import Buffer
87

@@ -32,7 +31,7 @@ def has_lz4() -> bool:
3231
return cramjam is not None
3332

3433

35-
def gzip_encode(payload: Buffer, compresslevel: int | None = None) -> bytes:
34+
def gzip_encode(payload: Buffer, compresslevel: Optional[int] = None) -> bytes:
3635
if not compresslevel:
3736
compresslevel = 9
3837

@@ -188,7 +187,7 @@ def lz4_decode(payload: Buffer) -> bytes:
188187
return bytes(cramjam.lz4.decompress(payload))
189188

190189

191-
def zstd_encode(payload: Buffer, level: int | None = None) -> bytes:
190+
def zstd_encode(payload: Buffer, level: Optional[int] = None) -> bytes:
192191
if not has_zstd():
193192
raise NotImplementedError("Zstd codec is not available")
194193

0 commit comments

Comments
 (0)