Open
Description
Hello. I am using kafka consumer only for getting partitions for topic. And after that, I try to close consumer, but got CancelledError. It seems, that it might be some time between start()
and stop()
, because adding await asyncio.sleep(1)
doesn't raise an error.
Expected behaviour
Consumer stops and doesn't raise an error
Environment:
- aiokafka version: 0.5.2
- kafka-python version: 1.4.6
- Kafka Broker version: 5.3.1
Reproducible example
import aiokafka
import asyncio
consumer = aiokafka.AIOKafkaConsumer(bootstrap_servers=bootstrap_servers, loop=asyncio.get_event_loop())
await consumer.start()
my_partitions = consumer.partitions_for_topic(my_topic)
await consumer.stop()
/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py in stop(self)
472 self._closed = True
473 if self._coordinator:
--> 474 yield from self._coordinator.close()
475 if self._fetcher:
476 yield from self._fetcher.close()
/usr/local/lib/python3.7/site-packages/aiokafka/consumer/group_coordinator.py in close(self)
160 def close(self):
161 self._reset_committed_task.cancel()
--> 162 yield from self._reset_committed_task
163 self._reset_committed_task = None
164
CancelledError:
Metadata
Metadata
Assignees
Labels
No labels