Skip to content

Commit 69825fc

Browse files
authored
Cloud info - use same session (#588)
1 parent d5fcedf commit 69825fc

File tree

6 files changed

+60
-45
lines changed

6 files changed

+60
-45
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
### Changed
11+
- CloudSettings will now reuse the HTTP session from the query client for sync clients.
12+
1013
### Fixed
1114

12-
- CloudInfo / CloudSettings now cached by authority (schema, host and port) instead of full URL
15+
- CloudSettings now cached by authority (schema, host and port) instead of full URL
1316

1417
## [5.0.3] - 2025-05-04
1518

azure-kusto-data/azure/kusto/data/_cloud_settings.py

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -54,28 +54,28 @@ class CloudSettings:
5454
first_party_authority_url=DEFAULT_FIRST_PARTY_AUTHORITY_URL,
5555
)
5656

57-
@classmethod
58-
@distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT)
59-
def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None) -> CloudInfo:
60-
normalized_authority = cls._normalize_uri(kusto_uri)
61-
62-
# tracing attributes for cloud info
63-
Span.set_cloud_info_attributes(kusto_uri)
64-
65-
if normalized_authority in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access
66-
return cls._cloud_cache[normalized_authority]
67-
68-
with cls._cloud_cache_lock:
69-
if normalized_authority in cls._cloud_cache:
70-
return cls._cloud_cache[normalized_authority]
71-
72-
url_parts = urlparse(kusto_uri)
57+
@classmethod
58+
@distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT)
59+
def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None, session: requests.Session = None) -> CloudInfo:
60+
normalized_authority = cls._normalize_uri(kusto_uri)
61+
62+
# tracing attributes for cloud info
63+
Span.set_cloud_info_attributes(kusto_uri)
64+
65+
if normalized_authority in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access
66+
return cls._cloud_cache[normalized_authority]
67+
68+
with cls._cloud_cache_lock:
69+
if normalized_authority in cls._cloud_cache:
70+
return cls._cloud_cache[normalized_authority]
71+
72+
url_parts = urlparse(kusto_uri)
7373
url = f"{url_parts.scheme}://{url_parts.netloc}/{METADATA_ENDPOINT}"
7474

7575
try:
7676
# trace http get call for result
7777
result = MonitoredActivity.invoke(
78-
lambda: requests.get(url, proxies=proxies, allow_redirects=False),
78+
lambda: (session or requests).get(url, proxies=proxies, allow_redirects=False),
7979
name_of_span="CloudSettings.http_get",
8080
tracing_attributes=Span.create_http_attributes(url=url, method="GET"),
8181
)
@@ -87,32 +87,32 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str,
8787
if content is None or content == {}:
8888
raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
8989
root = content["AzureAD"]
90-
if root is not None:
91-
cls._cloud_cache[normalized_authority] = CloudInfo(
92-
login_endpoint=root["LoginEndpoint"],
93-
login_mfa_required=root["LoginMfaRequired"],
94-
kusto_client_app_id=root["KustoClientAppId"],
95-
kusto_client_redirect_uri=root["KustoClientRedirectUri"],
96-
kusto_service_resource_id=root["KustoServiceResourceId"],
97-
first_party_authority_url=root["FirstPartyAuthorityUrl"],
98-
)
99-
else:
100-
cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD
101-
elif result.status_code == 404:
102-
# For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data
103-
cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD
104-
else:
105-
raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
90+
if root is not None:
91+
cls._cloud_cache[normalized_authority] = CloudInfo(
92+
login_endpoint=root["LoginEndpoint"],
93+
login_mfa_required=root["LoginMfaRequired"],
94+
kusto_client_app_id=root["KustoClientAppId"],
95+
kusto_client_redirect_uri=root["KustoClientRedirectUri"],
96+
kusto_service_resource_id=root["KustoServiceResourceId"],
97+
first_party_authority_url=root["FirstPartyAuthorityUrl"],
98+
)
99+
else:
100+
cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD
101+
elif result.status_code == 404:
102+
# For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data
103+
cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD
104+
else:
105+
raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
106106
return cls._cloud_cache[normalized_authority]
107107

108108
@classmethod
109109
def add_to_cache(cls, url: str, cloud_info: CloudInfo):
110110
with cls._cloud_cache_lock:
111111
cls._cloud_cache[cls._normalize_uri(url)] = cloud_info
112112

113-
@classmethod
114-
def _normalize_uri(cls, kusto_uri):
115-
"""Extracts and returns the authority part of the URI (schema, host, port)"""
116-
url_parts = urlparse(kusto_uri)
117-
# Return only the scheme and netloc (which contains host and port if present)
113+
@classmethod
114+
def _normalize_uri(cls, kusto_uri):
115+
"""Extracts and returns the authority part of the URI (schema, host, port)"""
116+
url_parts = urlparse(kusto_uri)
117+
# Return only the scheme and netloc (which contains host and port if present)
118118
return f"{url_parts.scheme}://{url_parts.netloc}"

azure-kusto-data/azure/kusto/data/_token_providers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from threading import Lock
99
from typing import Callable, Coroutine, List, Optional, Any
1010

11+
import requests
1112
from azure.core.exceptions import ClientAuthenticationError
1213
from azure.core.tracing import SpanKind
1314
from azure.core.tracing.decorator import distributed_trace
@@ -92,6 +93,7 @@ class TokenProviderBase(abc.ABC):
9293

9394
def __init__(self, is_async: bool = False):
9495
self._proxy_dict: Optional[str, str] = None
96+
self._session: Optional[requests.Session] = None
9597
self.is_async = is_async
9698

9799
if is_async:
@@ -262,6 +264,9 @@ def _valid_token_or_throw(self, token: dict, context: str = "") -> dict:
262264
def set_proxy(self, proxy_url: str):
263265
self._proxy_dict = {"http": proxy_url, "https": proxy_url}
264266

267+
def set_session(self, session: requests.Session):
268+
self._session = session
269+
265270

266271
class CloudInfoTokenProvider(TokenProviderBase, abc.ABC):
267272
_cloud_info: Optional[CloudInfo]
@@ -274,7 +279,7 @@ def __init__(self, kusto_uri: str, is_async: bool = False):
274279

275280
def _init_resources(self):
276281
if self._kusto_uri is not None:
277-
self._cloud_info = CloudSettings.get_cloud_info_for_cluster(self._kusto_uri, self._proxy_dict)
282+
self._cloud_info = CloudSettings.get_cloud_info_for_cluster(self._kusto_uri, self._proxy_dict, self._session)
278283
resource_uri = self._cloud_info.kusto_service_resource_id
279284
if self._cloud_info.login_mfa_required:
280285
resource_uri = resource_uri.replace(".kusto.", ".kustomfa.")

azure-kusto-data/azure/kusto/data/client_base.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
import io
33
import json
44
import uuid
5-
from copy import copy
65
from datetime import timedelta
76
from typing import Union, Optional, Any, NoReturn, ClassVar, TYPE_CHECKING
87
from urllib.parse import urljoin
98

10-
from requests import Response
9+
from requests import Response, Session
1110

1211
from azure.kusto.data._cloud_settings import CloudSettings
1312
from azure.kusto.data._token_providers import CloudInfoTokenProvider
@@ -34,6 +33,7 @@ class _KustoClientBase(abc.ABC):
3433
_aad_helper: _AadHelper
3534
client_details: ClientDetails
3635
_endpoint_validated = False
36+
_session: Union["aiohttp.ClientSession", "Session"]
3737

3838
def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], is_async):
3939
self._kcsb = kcsb
@@ -73,13 +73,20 @@ def set_proxy(self, proxy_url: str):
7373
self._proxy_url = proxy_url
7474
if self._aad_helper:
7575
self._aad_helper.token_provider.set_proxy(proxy_url)
76+
if isinstance(self._session, Session):
77+
self._aad_helper.token_provider.set_session(self._session)
7678

7779
def validate_endpoint(self):
7880
if not self._endpoint_validated and self._aad_helper is not None:
7981
if isinstance(self._aad_helper.token_provider, CloudInfoTokenProvider):
82+
endpoint = CloudSettings.get_cloud_info_for_cluster(
83+
self._kusto_cluster,
84+
self._aad_helper.token_provider._proxy_dict,
85+
self._session if isinstance(self._session, Session) else None,
86+
).login_endpoint
8087
well_known_kusto_endpoints.validate_trusted_endpoint(
8188
self._kusto_cluster,
82-
CloudSettings.get_cloud_info_for_cluster(self._kusto_cluster, self._aad_helper.token_provider._proxy_dict).login_endpoint,
89+
endpoint,
8390
)
8491
self._endpoint_validated = True
8592

azure-kusto-data/tests/test_kusto_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def test_custom_request_id(self, mock_post, method):
181181
self._assert_sanity_query_response(response)
182182
self._assert_client_request_id(mock_post.call_args[-1], value=request_id)
183183

184-
@patch("requests.get", side_effect=mocked_requests_post)
184+
@patch("requests.Session.get", side_effect=mocked_requests_post)
185185
def test_proxy_token_providers(self, mock_get, proxy_kcsb):
186186
"""Test query V2."""
187187
proxy = "https://my_proxy.sample"

azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def ingest_from_dataframe(
107107
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
108108
:param pandas.DataFrame df: input dataframe to ingest.
109109
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
110-
:param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSOn or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
110+
:param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
111111
"""
112112

113113
if self._is_closed:

0 commit comments

Comments
 (0)