Skip to content

Async requests hold indefinetely when more than one result is gathered #329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
GCBallesteros opened this issue Jul 1, 2021 · 4 comments
Closed
Assignees

Comments

@GCBallesteros
Copy link

GCBallesteros commented Jul 1, 2021

Code Sample, a copy-pastable example if possible

The following code will run without problems. However, if I uncomment the second call to execute on the async.gather the script hangs indefinitely. I've tried several variations including creating two different clients but those will also hang.

import asyncio
import os

from azure.kusto.data.aio import KustoClient
from azure.kusto.data.aio.client import KustoConnectionStringBuilder

from dotenv import load_dotenv

# Load env variables from .env
load_dotenv()

async def main():
    kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
        connection_string=os.environ["KUSTO_CLUSTER"],
        aad_app_id=os.environ["ADX_CLIENT_ID"],
        app_key=os.environ["ADX_CLIENT_SECRET"],
        authority_id=os.environ["AAD_TENANT_ID"],
    )

    test_query = "iot_telemetry | take 1"

    async with KustoClient(kcsb) as client:
        results = await asyncio.gather(
            client.execute(os.environ["KUSTO_DATABASE"], test_query),
            # client.execute(os.environ["KUSTO_DATABASE"], test_query),  # if uncommented hang forever
        )

    return results


if __name__ == "__main__":
    res = asyncio.run(main())

    print(res)

Problem description

I want to run several queries simultaneously and with the current behaviour I can't use the async features. What I would expect is to perform non-blocking IO on both queries.

When I stop execution via Control+C I get the following stack trace:

❯ python ./scripts/async_kusto_test.py
^CTraceback (most recent call last):
  File "./scripts/async_kusto_test.py", line 70, in <module>
    res = asyncio.run(main())
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 62, in _cancel_all_tasks
    loop.run_until_complete(
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "./scripts/async_kusto_test.py", line 61, in main
    results = await asyncio.gather(
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py", line 38, in execute
    return await self.execute_query(database, query, properties)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py", line 42, in execute_query
    return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeout, properties)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py", line 76, in _execute
    request_headers["Authorization"] = await self._auth_provider.acquire_authorization_header_async()
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/security.py", line 61, in acquire_authorization_header_a
sync
    return _get_header_from_dict(await self.token_provider.get_token_async())
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/_token_providers.py", line 86, in get_token_async
    with TokenProviderBase.lock:
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<main() done, defined at ./scripts/async_kusto_test.py:49> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "./scripts/async_kusto_test.py", line 70, in <module>
    res = asyncio.run(main())
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 62, in _cancel_all_tasks
    loop.run_until_complete(
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "./scripts/async_kusto_test.py", line 61, in main
    results = await asyncio.gather(
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py", line 38, in execute
    return await self.execute_query(database, query, properties)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py", line 42, in execute_query
    return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeout, properties)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py", line 76, in _execute
    request_headers["Authorization"] = await self._auth_provider.acquire_authorization_header_async()
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/security.py", line 61, in acquire_authorization_header_a
sync
    return _get_header_from_dict(await self.token_provider.get_token_async())
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/_token_providers.py", line 86, in get_token_async
    with TokenProviderBase.lock:
KeyboardInterrupt

If query related, does it happen on other platforms (Kusto Web UI, Kusto Explorer)?

Not query related. A single call to async execute and the sync client work fine.

Output of pip freeze

aiohttp==3.4.4
asgiref==3.2.10
async-timeout==3.0.1
attrs==21.2.0
azure-core==1.15.0
azure-identity==1.5.0
azure-kusto-data==2.1.3
backcall==0.2.0
certifi==2021.5.30
cffi==1.14.5
chardet==4.0.0
cryptography==3.4.7
decorator==5.0.9
idna==2.10
ipython==7.25.0
ipython-genutils==0.2.0
jedi==0.18.0
matplotlib-inline==0.1.2
msal==1.9.0
msal-extensions==0.3.0
multidict==4.7.6
numpy==1.21.0
pandas==1.2.5
parso==0.8.2
pexpect==4.8.0
pickleshare==0.7.5
portalocker==1.7.1
prompt-toolkit==3.0.19
ptyprocess==0.7.0
pycparser==2.20
Pygments==2.9.0
PyJWT==2.1.0
python-dateutil==2.8.1
python-dotenv==0.18.0
pytz==2021.1
requests==2.25.1
six==1.16.0
traitlets==5.0.5
urllib3==1.26.6
wcwidth==0.2.5
yarl==1.6.3

@AsafMah
Copy link
Collaborator

AsafMah commented Jul 15, 2021

We have now released version 2.2.1, that should solve your issue.
Let us know if that works

@GCBallesteros
Copy link
Author

It still doesn't work but now instead of hanging indefinetely it crashes with a KustoAuthenticationError. Please find below the new pip freeze and the stack trace (with minor edits to not leak ids and azure resources)

Stack Trace

Traceback (most recent call last):
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/security.py"
, line 57, in acquire_authorization_header_async
    return _get_header_from_dict(await self.token_provider.get_token_async())
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/_token_provi
ders.py", line 134, in get_token_async
    await self._init_once_async()
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/_token_provi
ders.py", line 94, in _init_once_async
    async with TokenProviderBase.async_lock:
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/locks.py", line 97, in __aenter__
    await self.acquire()
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/locks.py", line 203, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-3' coro=<KustoClient.execute() running at /home/guillem/.pyenv/ve
rsions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py:38> cb=[gather.<locals>._done
_callback() at /home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py:766]> got Future <Future pe
nding> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "scripts/async_kusto_test.py", line 71, in <module>
    res = asyncio.run(main())
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 616, in run_until_com
plete
    return future.result()
  File "scripts/async_kusto_test.py", line 61, in main
    results = await asyncio.gather(
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.p
y", line 38, in execute
    return await self.execute_query(database, query, properties)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.p
y", line 42, in execute_query
    return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeou
t, properties)
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.p
y", line 76, in _execute
    request_headers["Authorization"] = await self._auth_provider.acquire_authorization_header_async()
  File "/home/guillem/.pyenv/versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/security.py"
, line 61, in acquire_authorization_header_async
    raise KustoAuthenticationError(self.token_provider.name(), error, **kwargs)
azure.kusto.data.exceptions.KustoAuthenticationError: KustoAuthenticationError('ApplicationKeyTokenProvider',
 'RuntimeError("Task <Task pending name='Task-3' coro=<KustoClient.execute() running at /home/guillem/.pyenv/
versions/DefrostCycles/lib/python3.8/site-packages/azure/kusto/data/aio/client.py:38> cb=[gather.<locals>._do
ne_callback() at /home/guillem/.pyenv/versions/3.8.6/lib/python3.8/asyncio/tasks.py:766]> got Future <Future
pending> attached to a different loop")', '{'authority': 'https://login.microsoftonline.com/xxxxxxxx-xxxx-xxxxx-xxxx-xxxxxxxxxxxx', 'client_id': 'xxxxxxxx-xxx-xxxx-xxxx-xxxxxxxxxxxx', 'resource': 'https://xyz.northeurope.kusto.windows.net'}')

Pip freeze

``` aiohttp==3.4.4 argon2-cffi==20.1.0 asgiref==3.2.10 async-generator==1.10 async-timeout==3.0.1 attrs==21.2.0 azure-core==1.15.0 azure-identity==1.5.0 azure-kusto-data==2.2.1 backcall==0.2.0 bleach==3.3.0 certifi==2021.5.30 cffi==1.14.5 chardet==4.0.0 cryptography==3.4.7 cycler==0.10.0 debugpy==1.3.0 decorator==5.0.9 defusedxml==0.7.1 entrypoints==0.3 idna==2.10 ipykernel==6.0.1 ipython==7.25.0 ipython-genutils==0.2.0 ipywidgets==7.6.3 jedi==0.18.0 Jinja2==3.0.1 jsonschema==3.2.0 jupyter==1.0.0 jupyter-client==6.1.12 jupyter-console==6.4.0 jupyter-core==4.7.1 jupyterlab-pygments==0.1.2 jupyterlab-widgets==1.0.0 jupytext==1.11.3 kiwisolver==1.3.1 markdown-it-py==1.1.0 MarkupSafe==2.0.1 matplotlib==3.4.2 matplotlib-inline==0.1.2 mdit-py-plugins==0.2.8 mistune==0.8.4 msal==1.9.0 msal-extensions==0.3.0 multidict==4.7.6 nbclient==0.5.3 nbconvert==6.1.0 nbformat==5.1.3 nest-asyncio==1.5.1 notebook==6.4.0 numpy==1.21.0 packaging==21.0 pandas==1.2.5 pandocfilters==1.4.3 parse==1.19.0 parso==0.8.2 pexpect==4.8.0 pickleshare==0.7.5 Pillow==8.3.0 portalocker==1.7.1 prometheus-client==0.11.0 prompt-toolkit==3.0.19 ptyprocess==0.7.0 pyarrow==4.0.1 pycparser==2.20 Pygments==2.9.0 PyJWT==2.1.0 pyparsing==2.4.7 pyrsistent==0.18.0 python-dateutil==2.8.1 python-dotenv==0.18.0 pytz==2021.1 PyYAML==5.4.1 pyzmq==22.1.0 qtconsole==5.1.1 QtPy==1.9.0 requests==2.25.1 river==0.7.1 scipy==1.7.0 seaborn==0.11.1 Send2Trash==1.7.1 six==1.16.0 terminado==0.10.1 testpath==0.5.0 toml==0.10.2 tornado==6.1 traitlets==5.0.5 urllib3==1.26.6 wcwidth==0.2.5 webencodings==0.5.1 widgetsnbextension==3.5.1 yarl==1.6.3 ```

@AsafMah
Copy link
Collaborator

AsafMah commented Jul 18, 2021

We now released 2.3.0, and now this should work properly.

@GCBallesteros
Copy link
Author

Now it's working. Thanks for taking a look at this!
The issue is ready to be closed on my side.

@AsafMah AsafMah closed this as completed Jul 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants