Skip to content

AIOKafkaProducer appears to hang forever if Kafka is down for send_and_wait and stop() #1101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
MrCreosote opened this issue Mar 25, 2025 · 1 comment

Comments

@MrCreosote
Copy link

MrCreosote commented Mar 25, 2025

Describe the bug
If the Kafka server is down, send()ing a message and awaiting the results has two effects:

  1. A log is sent every retry_backoff_ms
    • This is pretty spammy at the default 100ms
  2. The await hangs apparently forever

If the result is asyncio.wait_for()'d with a timeout so that the thread can eventually continue, and then stop() is called on the producer:

  • The logs continue to be sent
  • The stop() hangs apparently forever

Expected behaviour
I would expect that both awaiting the result of a send() call (or awaiting a send_and_wait() call) and a stop() call eventually time out.

Environment (please complete the following information):

  • aiokafka version: 0.12.0
  • Kafka Broker version: Tested with 3.9.0 and 4.0.0
  • Other information: n/a

Reproducible example

"""
Test what happens when kafka is down and the notifier sends a message.
"""

import asyncio
from aiokafka import AIOKafkaProducer
import traceback


async def main():
    ###
    # Kafka is running here
    ###
    kp = AIOKafkaProducer(
        bootstrap_servers=f"localhost:9092",
        enable_idempotence=True,
        acks='all',
        request_timeout_ms=5000,
        retry_backoff_ms=1000
    )
    await kp.start()
    print("sending 1st message")
    future = await kp.send("mytopic", b"foo1")
    print("done")
    await future

    ###
    # Kafka is stopped here
    ###
    print("sending 2nd message")
    future = await kp.send("mytopic", b"foo2")
    print("done")
    try:
        await asyncio.wait_for(future, 6)
    except TimeoutError as e:
        traceback.print_exception(e)
    print("stopping client")
    await kp.stop()
    print('done"')


if __name__ == "__main__":
    asyncio.run(main())

The above results in the output:

sending 1st message
Topic mytopic not found in cluster metadata
done
sending 2nd message
done
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Traceback (most recent call last):
  File "/usr/lib/python3.11/asyncio/tasks.py", line 490, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/<user>/test_manual/aiokafka_test.py", line 36, in main
    await asyncio.wait_for(future, 6)
  File "/usr/lib/python3.11/asyncio/tasks.py", line 492, in wait_for
    raise exceptions.TimeoutError() from exc
TimeoutError
stopping client
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)

*** continues seemingly forever ***
@jainal09
Copy link

jainal09 commented Apr 9, 2025

+1 on this. Any way to stop or exit after not connecting certain amount of time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants