Skip to content

Commit a1fc7ab

Browse files
authored
pyupgrade the code (#752)
* Run pyupgrade with `--py36-plus` option Since the **aiokafka** package is not compatible with Python versions lower than 3.6, the codebase can be upgraded. * Replace formats with f-strings. * Further replacements of formats with f-strings * Fix lint errors.
1 parent 158873d commit a1fc7ab

27 files changed

+225
-209
lines changed

aiokafka/client.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ async def bootstrap(self):
246246
break
247247
else:
248248
raise KafkaConnectionError(
249-
'Unable to bootstrap from {}'.format(self.hosts))
249+
f'Unable to bootstrap from {self.hosts}')
250250

251251
# detect api version if need
252252
if self._api_version == 'auto':
@@ -544,12 +544,12 @@ async def check_version(self, node_id=None):
544544
conn = await self._get_conn(node_id, no_hint=True)
545545
if conn is None:
546546
raise KafkaConnectionError(
547-
"No connection to node with id {}".format(node_id))
547+
f"No connection to node with id {node_id}")
548548
for version, request in test_cases:
549549
try:
550550
if not conn.connected():
551551
await conn.connect()
552-
assert conn, 'no connection to node with id {}'.format(node_id)
552+
assert conn, f'no connection to node with id {node_id}'
553553
# request can be ignored by Kafka broker,
554554
# so we send metadata request and wait response
555555
task = create_task(conn.send(request))
@@ -591,10 +591,10 @@ def _check_api_version_response(self, response):
591591

592592
error_type = Errors.for_code(response.error_code)
593593
assert error_type is Errors.NoError, "API version check failed"
594-
max_versions = dict([
595-
(api_key, max_version)
594+
max_versions = {
595+
api_key: max_version
596596
for api_key, _, max_version in response.api_versions
597-
])
597+
}
598598
# Get the best match of test cases
599599
for broker_version, api_key, version in test_cases:
600600
if max_versions.get(api_key, -1) >= version:

aiokafka/conn.py

+27-31
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ def pick_best(self, request_versions):
6363
req_klass.API_VERSION <= supported_versions[1]:
6464
return req_klass
6565
raise Errors.KafkaError(
66-
"Could not pick a version for API_KEY={} from {}. ".format(
67-
api_key, supported_versions)
66+
f"Could not pick a version for API_KEY={api_key} from {supported_versions}."
6867
)
6968

7069

@@ -184,7 +183,7 @@ def __init__(self, host, port, *, client_id='aiokafka',
184183
# that
185184
def __del__(self, _warnings=warnings):
186185
if self.connected():
187-
_warnings.warn("Unclosed AIOKafkaConnection {!r}".format(self),
186+
_warnings.warn(f"Unclosed AIOKafkaConnection {self!r}",
188187
ResourceWarning,
189188
source=self)
190189
if self._loop.is_closed():
@@ -242,8 +241,8 @@ async def _do_version_lookup(self):
242241
versions = {}
243242
for api_key, min_version, max_version in response.api_versions:
244243
assert min_version <= max_version, (
245-
"{} should be less than or equal to {} for {}".format(
246-
min_version, max_version, api_key)
244+
f"{min_version} should be less than"
245+
f" or equal to {max_version} for {api_key}"
247246
)
248247
versions[api_key] = (min_version, max_version)
249248
self._version_info = VersionInfo(versions)
@@ -270,9 +269,9 @@ async def _do_sasl_handshake(self):
270269

271270
if self._sasl_mechanism not in response.enabled_mechanisms:
272271
exc = Errors.UnsupportedSaslMechanismError(
273-
'Kafka broker does not support %s sasl mechanism. '
274-
'Enabled mechanisms are: %s'
275-
% (self._sasl_mechanism, response.enabled_mechanisms))
272+
f"Kafka broker does not support {self._sasl_mechanism} sasl "
273+
f"mechanism. Enabled mechanisms are: {response.enabled_mechanisms}"
274+
)
276275
self.close(reason=CloseReason.AUTH_FAILURE, exc=exc)
277276
raise exc
278277

@@ -364,7 +363,7 @@ def sasl_principal(self):
364363
service = self._sasl_kerberos_service_name
365364
domain = self._sasl_kerberos_domain_name or self.host
366365

367-
return "{service}@{domain}".format(service=service, domain=domain)
366+
return f"{service}@{domain}"
368367

369368
@staticmethod
370369
def _on_read_task_error(self_ref, read_task):
@@ -401,7 +400,7 @@ def _idle_check(self_ref):
401400
wake_up_in, self._idle_check, self_ref)
402401

403402
def __repr__(self):
404-
return "<AIOKafkaConnection host={0.host} port={0.port}>".format(self)
403+
return f"<AIOKafkaConnection host={self.host} port={self.port}>"
405404

406405
@property
407406
def host(self):
@@ -414,8 +413,8 @@ def port(self):
414413
def send(self, request, expect_response=True):
415414
if self._writer is None:
416415
raise Errors.KafkaConnectionError(
417-
"No connection to broker at {0}:{1}"
418-
.format(self._host, self._port))
416+
f"No connection to broker at {self._host}:{self._port}"
417+
)
419418

420419
correlation_id = self._next_correlation_id()
421420
header = RequestHeader(request,
@@ -428,8 +427,8 @@ def send(self, request, expect_response=True):
428427
except OSError as err:
429428
self.close(reason=CloseReason.CONNECTION_BROKEN)
430429
raise Errors.KafkaConnectionError(
431-
"Connection at {0}:{1} broken: {2}".format(
432-
self._host, self._port, err))
430+
f"Connection at {self._host}:{self._port} broken: {err}"
431+
)
433432

434433
self.log.debug(
435434
'%s Request %d: %s', self, correlation_id, request)
@@ -443,17 +442,16 @@ def send(self, request, expect_response=True):
443442
def _send_sasl_token(self, payload, expect_response=True):
444443
if self._writer is None:
445444
raise Errors.KafkaConnectionError(
446-
"No connection to broker at {0}:{1}"
447-
.format(self._host, self._port))
448-
445+
f"No connection to broker at {self._host}:{self._port}"
446+
)
449447
size = struct.pack(">i", len(payload))
450448
try:
451449
self._writer.write(size + payload)
452450
except OSError as err:
453451
self.close(reason=CloseReason.CONNECTION_BROKEN)
454452
raise Errors.KafkaConnectionError(
455-
"Connection at {0}:{1} broken: {2}".format(
456-
self._host, self._port, err))
453+
f"Connection at {self._host}:{self._port} broken: {err}"
454+
)
457455

458456
if not expect_response:
459457
return self._writer.drain()
@@ -476,8 +474,8 @@ def close(self, reason=None, exc=None):
476474
for _, _, fut in self._requests:
477475
if not fut.done():
478476
error = Errors.KafkaConnectionError(
479-
"Connection at {0}:{1} closed".format(
480-
self._host, self._port))
477+
f"Connection at {self._host}:{self._port} closed"
478+
)
481479
if exc is not None:
482480
error.__cause__ = exc
483481
error.__context__ = exc
@@ -536,8 +534,9 @@ def _handle_frame(self, resp):
536534

537535
elif correlation_id != recv_correlation_id:
538536
error = Errors.CorrelationIdError(
539-
'Correlation ids do not match: sent {}, recv {}'
540-
.format(correlation_id, recv_correlation_id))
537+
f"Correlation ids do not match: sent {correlation_id},"
538+
f" recv {recv_correlation_id}"
539+
)
541540
if not fut.done():
542541
fut.set_exception(error)
543542
self.close(reason=CloseReason.OUT_OF_SYNC)
@@ -660,8 +659,7 @@ def __init__(self, *, loop, sasl_plain_password,
660659
self._authenticator = self.authenticator_scram()
661660

662661
def first_message(self):
663-
client_first_bare = 'n={},r={}'.format(
664-
self._sasl_plain_username, self._nonce)
662+
client_first_bare = f"n={self._sasl_plain_username},r={self._nonce}"
665663
self._auth_message += client_first_bare
666664
return 'n,,' + client_first_bare
667665

@@ -689,8 +687,8 @@ def process_server_first_message(self, server_first):
689687
self._server_key, self._auth_message.encode('utf-8'))
690688

691689
def final_message(self):
692-
return 'c=biws,r={},p={}'.format(
693-
self._nonce, base64.b64encode(self._client_proof).decode('utf-8'))
690+
client_proof = base64.b64encode(self._client_proof).decode('utf-8')
691+
return f"c=biws,r={self._nonce},p={client_proof}"
694692

695693
def process_server_final_message(self, server_final):
696694
params = dict(pair.split('=', 1) for pair in server_final.split(','))
@@ -734,9 +732,7 @@ async def step(self, payload):
734732
.encode("utf-8"), True
735733

736734
def _build_oauth_client_request(self, token, token_extensions):
737-
return "n,,\x01auth=Bearer {}{}\x01\x01".format(
738-
token, token_extensions
739-
)
735+
return f"n,,\x01auth=Bearer {token}{token_extensions}\x01\x01"
740736

741737
def _token_extensions(self):
742738
"""
@@ -751,7 +747,7 @@ def _token_extensions(self):
751747
extensions = self._sasl_oauth_token_provider.extensions()
752748
if len(extensions) > 0:
753749
msg = "\x01".join(
754-
["{}={}".format(k, v) for k, v in extensions.items()])
750+
[f"{k}={v}" for k, v in extensions.items()])
755751
return "\x01" + msg
756752

757753
return ""

aiokafka/consumer/consumer.py

+24-19
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
log = logging.getLogger(__name__)
2828

2929

30-
class AIOKafkaConsumer(object):
30+
class AIOKafkaConsumer:
3131
"""
3232
A client that consumes records from a Kafka cluster.
3333
@@ -322,7 +322,7 @@ def __init__(self, *topics, loop=None,
322322

323323
def __del__(self, _warnings=warnings):
324324
if self._closed is False:
325-
_warnings.warn("Unclosed AIOKafkaConsumer {!r}".format(self),
325+
_warnings.warn(f"Unclosed AIOKafkaConsumer {self!r}",
326326
ResourceWarning,
327327
source=self)
328328
context = {'consumer': self,
@@ -346,8 +346,9 @@ async def start(self):
346346
await self._wait_topics()
347347

348348
if self._client.api_version < (0, 9):
349-
raise ValueError("Unsupported Kafka version: {}".format(
350-
self._client.api_version))
349+
raise ValueError(
350+
f"Unsupported Kafka version: {self._client.api_version}"
351+
)
351352

352353
if self._isolation_level == "read_committed" and \
353354
self._client.api_version < (0, 11):
@@ -549,7 +550,7 @@ async def commit(self, offsets=None):
549550
for tp in offsets:
550551
if tp not in assignment.tps:
551552
raise IllegalStateError(
552-
"Partition {} is not assigned".format(tp))
553+
f"Partition {tp} is not assigned")
553554

554555
await self._coordinator.commit_offsets(assignment, offsets)
555556

@@ -630,7 +631,7 @@ async def position(self, partition):
630631
while True:
631632
if not self._subscription.is_assigned(partition):
632633
raise IllegalStateError(
633-
'Partition {} is not assigned'.format(partition))
634+
f'Partition {partition} is not assigned')
634635

635636
assignment = self._subscription.subscription.assignment
636637
tp_state = assignment.state_value(partition)
@@ -645,7 +646,7 @@ async def position(self, partition):
645646
if not tp_state.has_valid_position:
646647
if self._subscription.subscription is None:
647648
raise IllegalStateError(
648-
'Partition {} is not assigned'.format(partition))
649+
f'Partition {partition} is not assigned')
649650
if self._subscription.subscription.assignment is None:
650651
self._coordinator.check_errors()
651652
await self._subscription.wait_for_assignment()
@@ -772,7 +773,7 @@ async def seek_to_beginning(self, *partitions):
772773
)
773774
if not_assigned:
774775
raise IllegalStateError(
775-
"Partitions {} are not assigned".format(not_assigned))
776+
f"Partitions {not_assigned} are not assigned")
776777

777778
for tp in partitions:
778779
log.debug("Seeking to beginning of partition %s", tp)
@@ -814,7 +815,7 @@ async def seek_to_end(self, *partitions):
814815
)
815816
if not_assigned:
816817
raise IllegalStateError(
817-
"Partitions {} are not assigned".format(not_assigned))
818+
f"Partitions {not_assigned} are not assigned")
818819

819820
for tp in partitions:
820821
log.debug("Seeking to end of partition %s", tp)
@@ -861,7 +862,7 @@ async def seek_to_committed(self, *partitions):
861862
)
862863
if not_assigned:
863864
raise IllegalStateError(
864-
"Partitions {} are not assigned".format(not_assigned))
865+
f"Partitions {not_assigned} are not assigned")
865866

866867
committed_offsets = {}
867868
for tp in partitions:
@@ -909,14 +910,16 @@ async def offsets_for_times(self, timestamps):
909910
"""
910911
if self._client.api_version <= (0, 10, 0):
911912
raise UnsupportedVersionError(
912-
"offsets_for_times API not supported for cluster version {}"
913-
.format(self._client.api_version))
913+
"offsets_for_times API not supported"
914+
f" for cluster version {self._client.api_version}"
915+
)
914916
for tp, ts in timestamps.items():
915917
timestamps[tp] = int(ts)
916918
if ts < 0:
917919
raise ValueError(
918-
"The target time for partition {} is {}. The target time "
919-
"cannot be negative.".format(tp, ts))
920+
f"The target time for partition {tp} is {ts}."
921+
" The target time cannot be negative."
922+
)
920923
offsets = await self._fetcher.get_offsets_by_times(
921924
timestamps, self._request_timeout_ms)
922925
return offsets
@@ -948,8 +951,9 @@ async def beginning_offsets(self, partitions):
948951
"""
949952
if self._client.api_version <= (0, 10, 0):
950953
raise UnsupportedVersionError(
951-
"offsets_for_times API not supported for cluster version {}"
952-
.format(self._client.api_version))
954+
"offsets_for_times API not supported"
955+
f" for cluster version {self._client.api_version}"
956+
)
953957
offsets = await self._fetcher.beginning_offsets(
954958
partitions, self._request_timeout_ms)
955959
return offsets
@@ -983,8 +987,9 @@ async def end_offsets(self, partitions):
983987
"""
984988
if self._client.api_version <= (0, 10, 0):
985989
raise UnsupportedVersionError(
986-
"offsets_for_times API not supported for cluster version {}"
987-
.format(self._client.api_version))
990+
"offsets_for_times API not supported"
991+
f" for cluster version {self._client.api_version}"
992+
)
988993
offsets = await self._fetcher.end_offsets(
989994
partitions, self._request_timeout_ms)
990995
return offsets
@@ -1044,7 +1049,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
10441049
pattern = re.compile(pattern)
10451050
except re.error as err:
10461051
raise ValueError(
1047-
"{!r} is not a valid pattern: {}".format(pattern, err))
1052+
f"{pattern!r} is not a valid pattern: {err}")
10481053
self._subscription.subscribe_pattern(
10491054
pattern=pattern, listener=listener)
10501055
# NOTE: set_topics will trigger a rebalance, so the coordinator

aiokafka/consumer/fetcher.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def to_str(cls, value):
5353
if value == cls.NONE:
5454
return "none"
5555
else:
56-
return "timestamp({})".format(value)
56+
return f"timestamp({value})"
5757

5858

5959
class FetchResult:
@@ -145,8 +145,7 @@ def has_more(self):
145145
return self._partition_records is not None
146146

147147
def __repr__(self):
148-
return "<FetchResult position={!r}>".format(
149-
self._partition_records.next_fetch_offset)
148+
return f"<FetchResult position={self._partition_records.next_fetch_offset!r}>"
150149

151150

152151
class FetchError:
@@ -166,7 +165,7 @@ def check_raise(self):
166165
raise self._error
167166

168167
def __repr__(self):
169-
return "<FetchError error={!r}>".format(self._error)
168+
return f"<FetchError error={self._error!r}>"
170169

171170

172171
class PartitionRecords:
@@ -213,7 +212,7 @@ def _unpack_records(self):
213212
# This iterator will be closed after the exception, so we don't
214213
# try to drain other batches here. They will be refetched.
215214
raise Errors.CorruptRecordException(
216-
"Invalid CRC - {tp}".format(tp=tp))
215+
f"Invalid CRC - {tp}")
217216

218217
if self._isolation_level == READ_COMMITTED and \
219218
next_batch.producer_id is not None:
@@ -389,7 +388,7 @@ def __init__(
389388
self._isolation_level = READ_COMMITTED
390389
else:
391390
raise ValueError(
392-
"Incorrect isolation level {}".format(isolation_level))
391+
f"Incorrect isolation level {isolation_level}")
393392

394393
self._records = collections.OrderedDict()
395394
self._in_flight = set()

0 commit comments

Comments
 (0)