You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
If the Kafka server is down, send()ing a message and awaiting the results has two effects:
A log is sent every retry_backoff_ms
This is pretty spammy at the default 100ms
The await hangs apparently forever
If the result is asyncio.wait_for()'d with a timeout so that the thread can eventually continue, and then stop() is called on the producer:
The logs continue to be sent
The stop() hangs apparently forever
Expected behaviour
I would expect that both awaiting the result of a send() call (or awaiting a send_and_wait() call) and a stop() call eventually time out.
Environment (please complete the following information):
aiokafka version: 0.12.0
Kafka Broker version: Tested with 3.9.0 and 4.0.0
Other information: n/a
Reproducible example
"""Test what happens when kafka is down and the notifier sends a message."""importasynciofromaiokafkaimportAIOKafkaProducerimporttracebackasyncdefmain():
#### Kafka is running here###kp=AIOKafkaProducer(
bootstrap_servers=f"localhost:9092",
enable_idempotence=True,
acks='all',
request_timeout_ms=5000,
retry_backoff_ms=1000
)
awaitkp.start()
print("sending 1st message")
future=awaitkp.send("mytopic", b"foo1")
print("done")
awaitfuture#### Kafka is stopped here###print("sending 2nd message")
future=awaitkp.send("mytopic", b"foo2")
print("done")
try:
awaitasyncio.wait_for(future, 6)
exceptTimeoutErrorase:
traceback.print_exception(e)
print("stopping client")
awaitkp.stop()
print('done"')
if__name__=="__main__":
asyncio.run(main())
The above results in the output:
sending 1st message
Topic mytopic not found in cluster metadata
done
sending 2nd message
done
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Traceback (most recent call last):
File "/usr/lib/python3.11/asyncio/tasks.py", line 490, in wait_for
return fut.result()
^^^^^^^^^^^^
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/<user>/test_manual/aiokafka_test.py", line 36, in main
await asyncio.wait_for(future, 6)
File "/usr/lib/python3.11/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
TimeoutError
stopping client
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
*** continues seemingly forever ***
The text was updated successfully, but these errors were encountered:
Describe the bug
If the Kafka server is down,
send()
ing a message and awaiting the results has two effects:retry_backoff_ms
await
hangs apparently foreverIf the result is
asyncio.wait_for()
'd with a timeout so that the thread can eventually continue, and thenstop()
is called on the producer:stop()
hangs apparently foreverExpected behaviour
I would expect that both awaiting the result of a
send()
call (or awaiting asend_and_wait()
call) and astop()
call eventually time out.Environment (please complete the following information):
Reproducible example
The above results in the output:
The text was updated successfully, but these errors were encountered: