Skip to content

Commit 4fbea5c

Browse files
authored
Don't pass loop explicitly in docs and examples (#693)
1 parent 07e9bd3 commit 4fbea5c

28 files changed

+166
-355
lines changed

CHANGES.rst

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ Exclude `.so` from source distribution
1212
689.bugfix
1313
Add `dataclasses` backport package to dependencies for Python 3.6
1414

15+
693.doc
16+
Update docs and examples to not use deprecated practices like passing loop explicitly
17+
1518

1619
0.7.0 (2020-10-28)
1720
==================

README.rst

+5-10
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,8 @@ Example of AIOKafkaProducer usage:
2525
from aiokafka import AIOKafkaProducer
2626
import asyncio
2727
28-
loop = asyncio.get_event_loop()
29-
3028
async def send_one():
31-
producer = AIOKafkaProducer(
32-
loop=loop, bootstrap_servers='localhost:9092')
29+
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
3330
# Get cluster layout and initial topic/partition leadership information
3431
await producer.start()
3532
try:
@@ -39,14 +36,14 @@ Example of AIOKafkaProducer usage:
3936
# Wait for all pending messages to be delivered or expire.
4037
await producer.stop()
4138
42-
loop.run_until_complete(send_one())
39+
asyncio.run(send_one())
4340
4441
4542
AIOKafkaConsumer
4643
****************
4744

4845
AIOKafkaConsumer is a high-level, asynchronous message consumer.
49-
It interacts with the assigned Kafka Group Coordinator node to allow multiple
46+
It interacts with the assigned Kafka Group Coordinator node to allow multiple
5047
consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
5148

5249
Example of AIOKafkaConsumer usage:
@@ -56,12 +53,10 @@ Example of AIOKafkaConsumer usage:
5653
from aiokafka import AIOKafkaConsumer
5754
import asyncio
5855
59-
loop = asyncio.get_event_loop()
60-
6156
async def consume():
6257
consumer = AIOKafkaConsumer(
6358
'my_topic', 'my_other_topic',
64-
loop=loop, bootstrap_servers='localhost:9092',
59+
bootstrap_servers='localhost:9092',
6560
group_id="my-group")
6661
# Get cluster layout and join group `my-group`
6762
await consumer.start()
@@ -74,7 +69,7 @@ Example of AIOKafkaConsumer usage:
7469
# Will leave consumer group; perform autocommit if enabled.
7570
await consumer.stop()
7671
77-
loop.run_until_complete(consume())
72+
asyncio.run(consume())
7873
7974
Running tests
8075
-------------

aiokafka/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ async def close(self):
183183

184184
async def bootstrap(self):
185185
"""Try to to bootstrap initial cluster metadata"""
186-
assert self._loop is asyncio.get_event_loop(), (
186+
assert self._loop is get_running_loop(), (
187187
"Please create objects with the same loop as running with"
188188
)
189189
# using request v0 for bootstrap if not sure v1 is available

aiokafka/consumer/consumer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ async def start(self):
334334
* Wait for possible topic autocreation
335335
* Join group if ``group_id`` provided
336336
"""
337-
assert self._loop is asyncio.get_event_loop(), (
337+
assert self._loop is get_running_loop(), (
338338
"Please create objects with the same loop as running with"
339339
)
340340
assert self._fetcher is None, "Did you call `start` twice?"

aiokafka/producer/producer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ def __del__(self, _warnings=warnings):
283283

284284
async def start(self):
285285
"""Connect to Kafka cluster and check server version"""
286-
assert self._loop is asyncio.get_event_loop(), (
286+
assert self._loop is get_running_loop(), (
287287
"Please create objects with the same loop as running with"
288288
)
289289
log.debug("Starting the Kafka producer") # trace

benchmark/simple_consume_bench.py

+2-17
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ async def bench_simple(self):
5050
consumer = AIOKafkaConsumer(
5151
topic, group_id="test_group", auto_offset_reset="earliest",
5252
enable_auto_commit=False,
53-
bootstrap_servers=self._bootstrap_servers,
54-
loop=loop)
53+
bootstrap_servers=self._bootstrap_servers)
5554
await consumer.start()
5655

5756
# We start from after producer connect
@@ -116,21 +115,7 @@ def main():
116115
import uvloop
117116
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
118117

119-
loop = asyncio.get_event_loop()
120-
task = loop.create_task(Benchmark(args).bench_simple())
121-
task.add_done_callback(lambda _, loop=loop: loop.stop())
122-
123-
def signal_hndl(_task=task):
124-
_task.cancel()
125-
loop.add_signal_handler(signal.SIGTERM, signal_hndl)
126-
loop.add_signal_handler(signal.SIGINT, signal_hndl)
127-
128-
try:
129-
loop.run_forever()
130-
finally:
131-
loop.close()
132-
if not task.cancelled():
133-
task.result()
118+
asyncio.run(Benchmark(args).bench_simple())
134119

135120

136121
if __name__ == "__main__":

benchmark/simple_produce_bench.py

+2-16
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def bench_simple(self):
6565
partition = self._partition
6666
loop = asyncio.get_event_loop()
6767

68-
producer = AIOKafkaProducer(loop=loop, **self._producer_kwargs)
68+
producer = AIOKafkaProducer(**self._producer_kwargs)
6969
await producer.start()
7070

7171
# We start from after producer connect
@@ -139,21 +139,7 @@ def main():
139139
import uvloop
140140
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
141141

142-
loop = asyncio.get_event_loop()
143-
task = loop.create_task(Benchmark(args).bench_simple())
144-
task.add_done_callback(lambda _, loop=loop: loop.stop())
145-
146-
def signal_hndl(_task=task):
147-
_task.cancel()
148-
loop.add_signal_handler(signal.SIGTERM, signal_hndl)
149-
loop.add_signal_handler(signal.SIGINT, signal_hndl)
150-
151-
try:
152-
loop.run_forever()
153-
finally:
154-
loop.close()
155-
if not task.cancelled():
156-
task.result()
142+
asyncio.run(Benchmark(args).bench_simple())
157143

158144

159145
if __name__ == "__main__":

docker/build.py

+4-8
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import argparse
66

77

8-
@asyncio.coroutine
9-
def build(versions_file, args, *, loop):
8+
async def build(versions_file, args, *, loop):
109

1110
with open(versions_file) as f:
1211
config = yaml.load(f.read())
@@ -21,18 +20,16 @@ def build(versions_file, args, *, loop):
2120
action=action,
2221
image_name=config['image_name'],
2322
**version_map))
24-
proc = yield from asyncio.create_subprocess_exec(*args, loop=loop)
23+
proc = await asyncio.create_subprocess_exec(*args)
2524
procs.append(proc.wait())
2625

27-
res = yield from asyncio.gather(*procs, loop=loop)
26+
res = await asyncio.gather(*procs)
2827
if any(res): # If any of statuses are not 0 return right away
2928
return res
3029
return res
3130

3231

3332
if __name__ == '__main__':
34-
loop = asyncio.get_event_loop()
35-
3633
parser = argparse.ArgumentParser(
3734
description='Build and push images in parallel.')
3835
parser.add_argument(
@@ -41,6 +38,5 @@ def build(versions_file, args, *, loop):
4138

4239
args = parser.parse_args()
4340

44-
statuses = loop.run_until_complete(build('config.yml', args, loop=loop))
45-
loop.close()
41+
statuses = asyncio.run(build('config.yml', args))
4642
sys.exit(max(statuses))

0 commit comments

Comments
 (0)