1
1
import asyncio
2
2
import logging
3
3
from collections import defaultdict
4
+ from collections .abc import Sequence
4
5
from ssl import SSLContext
5
- from typing import Any , Dict , List , Optional , Sequence , Tuple , Type , Union
6
+ from typing import Any , Optional , Union
6
7
7
8
import async_timeout
8
9
@@ -88,7 +89,7 @@ def __init__(
88
89
self ,
89
90
* ,
90
91
loop = None ,
91
- bootstrap_servers : Union [str , List [str ]] = "localhost" ,
92
+ bootstrap_servers : Union [str , list [str ]] = "localhost" ,
92
93
client_id : str = "aiokafka-" + __version__ ,
93
94
request_timeout_ms : int = 40000 ,
94
95
connections_max_idle_ms : int = 540000 ,
@@ -159,7 +160,7 @@ async def start(self):
159
160
log .debug ("AIOKafkaAdminClient started" )
160
161
self ._started = True
161
162
162
- def _matching_api_version (self , operation : Sequence [Type [Request ]]) -> int :
163
+ def _matching_api_version (self , operation : Sequence [type [Request ]]) -> int :
163
164
"""Find the latest version of the protocol operation
164
165
supported by both this library and the broker.
165
166
@@ -225,7 +226,7 @@ def _convert_new_topic_request(new_topic):
225
226
226
227
async def create_topics (
227
228
self ,
228
- new_topics : List [NewTopic ],
229
+ new_topics : list [NewTopic ],
229
230
timeout_ms : Optional [int ] = None ,
230
231
validate_only : bool = False ,
231
232
) -> Response :
@@ -267,7 +268,7 @@ async def create_topics(
267
268
268
269
async def delete_topics (
269
270
self ,
270
- topics : List [str ],
271
+ topics : list [str ],
271
272
timeout_ms : Optional [int ] = None ,
272
273
) -> Response :
273
274
"""Delete topics from the cluster.
@@ -284,7 +285,7 @@ async def delete_topics(
284
285
285
286
async def _get_cluster_metadata (
286
287
self ,
287
- topics : Optional [List [str ]] = None ,
288
+ topics : Optional [list [str ]] = None ,
288
289
) -> Response :
289
290
"""
290
291
Retrieve cluster metadata
@@ -295,30 +296,30 @@ async def _get_cluster_metadata(
295
296
request = req_cls (topics = topics )
296
297
return await self ._send_request (request )
297
298
298
- async def list_topics (self ) -> List [str ]:
299
+ async def list_topics (self ) -> list [str ]:
299
300
metadata = await self ._get_cluster_metadata (topics = None )
300
301
obj = metadata .to_object ()
301
302
return [t ["topic" ] for t in obj ["topics" ]]
302
303
303
304
async def describe_topics (
304
305
self ,
305
- topics : Optional [List [str ]] = None ,
306
- ) -> List [Any ]:
306
+ topics : Optional [list [str ]] = None ,
307
+ ) -> list [Any ]:
307
308
metadata = await self ._get_cluster_metadata (topics = topics )
308
309
obj = metadata .to_object ()
309
310
return obj ["topics" ]
310
311
311
- async def describe_cluster (self ) -> Dict [str , Any ]:
312
+ async def describe_cluster (self ) -> dict [str , Any ]:
312
313
metadata = await self ._get_cluster_metadata ()
313
314
obj = metadata .to_object ()
314
315
obj .pop ("topics" ) # We have 'describe_topics' for this
315
316
return obj
316
317
317
318
async def describe_configs (
318
319
self ,
319
- config_resources : List [ConfigResource ],
320
+ config_resources : list [ConfigResource ],
320
321
include_synonyms : bool = False ,
321
- ) -> List [Response ]:
322
+ ) -> list [Response ]:
322
323
"""Fetch configuration parameters for one or more Kafka resources.
323
324
324
325
:param config_resources: An list of ConfigResource objects.
@@ -360,8 +361,8 @@ async def describe_configs(
360
361
return await asyncio .gather (* futures )
361
362
362
363
async def alter_configs (
363
- self , config_resources : List [ConfigResource ]
364
- ) -> List [Response ]:
364
+ self , config_resources : list [ConfigResource ]
365
+ ) -> list [Response ]:
365
366
"""Alter configuration parameters of one or more Kafka resources.
366
367
:param config_resources: A list of ConfigResource objects.
367
368
:return: Appropriate version of AlterConfigsResponse class.
@@ -398,9 +399,9 @@ def _convert_alter_config_resource_request(config_resource):
398
399
@classmethod
399
400
def _convert_config_resources (
400
401
cls ,
401
- config_resources : List [ConfigResource ],
402
+ config_resources : list [ConfigResource ],
402
403
op_type : str = "describe" ,
403
- ) -> Tuple [ Dict [int , Any ], List [Any ]]:
404
+ ) -> tuple [ dict [int , Any ], list [Any ]]:
404
405
broker_resources = defaultdict (list )
405
406
topic_resources = []
406
407
if op_type == "describe" :
@@ -416,15 +417,15 @@ def _convert_config_resources(
416
417
return broker_resources , topic_resources
417
418
418
419
@staticmethod
419
- def _convert_topic_partitions (topic_partitions : Dict [str , NewPartitions ]):
420
+ def _convert_topic_partitions (topic_partitions : dict [str , NewPartitions ]):
420
421
return [
421
422
(topic_name , (new_part .total_count , new_part .new_assignments ))
422
423
for topic_name , new_part in topic_partitions .items ()
423
424
]
424
425
425
426
async def create_partitions (
426
427
self ,
427
- topic_partitions : Dict [str , NewPartitions ],
428
+ topic_partitions : dict [str , NewPartitions ],
428
429
timeout_ms : Optional [int ] = None ,
429
430
validate_only : bool = False ,
430
431
) -> Response :
@@ -455,10 +456,10 @@ async def create_partitions(
455
456
456
457
async def describe_consumer_groups (
457
458
self ,
458
- group_ids : List [str ],
459
+ group_ids : list [str ],
459
460
group_coordinator_id : Optional [int ] = None ,
460
461
include_authorized_operations : bool = False ,
461
- ) -> List [Response ]:
462
+ ) -> list [Response ]:
462
463
"""Describe a set of consumer groups.
463
464
464
465
Any errors are immediately raised.
@@ -508,8 +509,8 @@ async def describe_consumer_groups(
508
509
509
510
async def list_consumer_groups (
510
511
self ,
511
- broker_ids : Optional [List [int ]] = None ,
512
- ) -> List [ Tuple [Any , ...]]:
512
+ broker_ids : Optional [list [int ]] = None ,
513
+ ) -> list [ tuple [Any , ...]]:
513
514
"""List all consumer groups known to the cluster.
514
515
515
516
This returns a list of Consumer Group tuples. The tuples are
@@ -578,8 +579,8 @@ async def list_consumer_group_offsets(
578
579
self ,
579
580
group_id : str ,
580
581
group_coordinator_id : Optional [int ] = None ,
581
- partitions : Optional [List [TopicPartition ]] = None ,
582
- ) -> Dict [TopicPartition , OffsetAndMetadata ]:
582
+ partitions : Optional [list [TopicPartition ]] = None ,
583
+ ) -> dict [TopicPartition , OffsetAndMetadata ]:
583
584
"""Fetch Consumer Offsets for a single consumer group.
584
585
585
586
Note:
@@ -636,9 +637,9 @@ async def list_consumer_group_offsets(
636
637
637
638
async def delete_records (
638
639
self ,
639
- records_to_delete : Dict [TopicPartition , RecordsToDelete ],
640
+ records_to_delete : dict [TopicPartition , RecordsToDelete ],
640
641
timeout_ms : Optional [int ] = None ,
641
- ) -> Dict [TopicPartition , int ]:
642
+ ) -> dict [TopicPartition , int ]:
642
643
"""Delete records from partitions.
643
644
644
645
:param records_to_delete: A map of RecordsToDelete for each TopicPartition
@@ -681,7 +682,7 @@ async def delete_records(
681
682
682
683
@staticmethod
683
684
def _convert_records_to_delete (
684
- records_to_delete : Dict [str , List [ Tuple [int , RecordsToDelete ]]],
685
+ records_to_delete : dict [str , list [ tuple [int , RecordsToDelete ]]],
685
686
):
686
687
return [
687
688
(topic , [(partition , rec .before_offset ) for partition , rec in records ])
0 commit comments