-
Notifications
You must be signed in to change notification settings - Fork 239
[QUESTION] aiokafka stucks after group rejoin #847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I managed to catch the problem with DEBUG logging level on. That's what I've got.
We have a custom rebalance_listener to log partition info and as you can see after successful class RebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer):
self.consumer: AIOKafkaConsumer = consumer
async def on_partitions_revoked(self, revoked):
log.info(f"Revoked {revoked}")
for tp in revoked:
log.info(f"{revoked} offset is {await self.consumer.position(tp)}")
async def on_partitions_assigned(self, assigned):
log.info(f"Assigned {assigned}")
for tp in assigned:
log.info(f"{assigned} offset is {await self.consumer.position(tp)}")
# later on ...
listener = RebalanceListener(consumer)
consumer.subscribe(topics=[tasks_topic], listener=listener) |
We had to move to other library :( |
@sam-dee, We have a similar issue. What library did you move to? kafka-python? |
@c0nst-float, We use confluent-kafka with "pull" wrapped with loop.run_in_executor, which works pretty much the same. Here's sources from confluent_kafka import Consumer
async def listen_kafka():
log.info(f"Starting listen Kafka")
consumer = Consumer({
'bootstrap.servers': kafka_settings.KAFKA_ADDR,
'group.id': f"{kafka_settings.TASKS_TOPIC}_group",
'auto.offset.reset': 'latest',
'enable.auto.commit': True
})
semaphore = asyncio.Semaphore(kafka_settings.NOF_SEM_TASKS)
consumer.subscribe([kafka_settings.TASKS_TOPIC])
loop = asyncio.get_running_loop()
poll = functools.partial(consumer.poll, 1)
try:
while True:
message = await loop.run_in_executor(None, poll)
if message is None:
continue
if message.error():
log.error(f"Error while getting message {message.error()}")
continue
async with semaphore:
try:
msg = loads(message.value().decode('utf-8'))
asyncio.create_task(dispatcher.dispatch(msg))
except DispatchError as e:
log.error(str(e))
except asyncio.CancelledError:
log.info("Stopping consumer...")
consumer.close()
log.info("Listen Kafka task stopped") |
How long it taked when execute the "await dispatch(msg.value)"?, looks like this line code "stucked" the loop. |
In our case it actually works pretty fast, no longer than 2-3 sec. If it's stuck in your case, then it might be CPU-bounded task, hence consider moving it into separate thread, e.g asyncio.create_task(loop.run_in_executor(None, dispatcher.dispatch(msg))) |
@ods I have bisected this issue to #802 Consider the following script: import asyncio
import logging
import time
import aiokafka
async def consume_task(consumer):
async for msg in consumer:
print(msg)
await consumer.commit()
async def lag_task(producer):
while True:
print("sleeping")
await asyncio.sleep(10)
print("inducing lag")
time.sleep(40)
print("sending message")
await producer.send_and_wait(
topic='some_topic',
value=b'a message',
)
async def main():
async with (
aiokafka.AIOKafkaProducer() as producer,
aiokafka.AIOKafkaConsumer(
'some_topic',
group_id='some_group',
enable_auto_commit=False,
) as consumer,
):
await consumer.seek_to_end()
task1 = asyncio.create_task(consume_task(consumer))
task2 = asyncio.create_task(lag_task(producer))
await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)
print("something finished")
if __name__ == '__main__':
# logging.basicConfig(level=logging.DEBUG)
asyncio.run(main()) Prior to #802 this will print:
After #802 this will print:
Notice the If you turn on debug logging you will see that after the induced lag we stop getting:
No new messages will ever be received by Any ideas? |
Description
We have long running service, which processes messages through the aiokafka, running in coroutine. Once in a few days
Failed fetch messages from 1: [Error 7] RequestTimedOutError
happens for some reason and after that no messages are consumed. It seems to rejoin the group properly afterRequestTimedOutError
but nothing happens.Environment
The service is deployed into
python:3.10-slim
Docker containerCoroutine code used
Unfortunately, no decent text logs left yet we have a screenshot that shows the situation fairly enough.

At the time the screenshot was taken plenty of tasks had been in kafka. They were successfully processed after service restart.
Do you have any ideas how we can overcome or at least debug the problem?
The text was updated successfully, but these errors were encountered: