diff --git a/CHANGELOG.md b/CHANGELOG.md index 4faeaf12..39b991d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Changed +- CloudSettings will now reuse the HTTP session from the query client for sync clients. + ### Fixed -- CloudInfo / CloudSettings now cached by authority (schema, host and port) instead of full URL +- CloudSettings now cached by authority (schema, host and port) instead of full URL ## [5.0.3] - 2025-05-04 diff --git a/azure-kusto-data/azure/kusto/data/_cloud_settings.py b/azure-kusto-data/azure/kusto/data/_cloud_settings.py index 7f1f34c9..0c97cd37 100644 --- a/azure-kusto-data/azure/kusto/data/_cloud_settings.py +++ b/azure-kusto-data/azure/kusto/data/_cloud_settings.py @@ -54,28 +54,28 @@ class CloudSettings: first_party_authority_url=DEFAULT_FIRST_PARTY_AUTHORITY_URL, ) - @classmethod - @distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT) - def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None) -> CloudInfo: - normalized_authority = cls._normalize_uri(kusto_uri) - - # tracing attributes for cloud info - Span.set_cloud_info_attributes(kusto_uri) - - if normalized_authority in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access - return cls._cloud_cache[normalized_authority] - - with cls._cloud_cache_lock: - if normalized_authority in cls._cloud_cache: - return cls._cloud_cache[normalized_authority] - - url_parts = urlparse(kusto_uri) + @classmethod + @distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT) + def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None, session: requests.Session = None) -> CloudInfo: + normalized_authority = cls._normalize_uri(kusto_uri) + + # tracing attributes for cloud info + Span.set_cloud_info_attributes(kusto_uri) + + if normalized_authority in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access + return cls._cloud_cache[normalized_authority] + + with cls._cloud_cache_lock: + if normalized_authority in cls._cloud_cache: + return cls._cloud_cache[normalized_authority] + + url_parts = urlparse(kusto_uri) url = f"{url_parts.scheme}://{url_parts.netloc}/{METADATA_ENDPOINT}" try: # trace http get call for result result = MonitoredActivity.invoke( - lambda: requests.get(url, proxies=proxies, allow_redirects=False), + lambda: (session or requests).get(url, proxies=proxies, allow_redirects=False), name_of_span="CloudSettings.http_get", tracing_attributes=Span.create_http_attributes(url=url, method="GET"), ) @@ -87,22 +87,22 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, if content is None or content == {}: raise KustoServiceError("Kusto returned an invalid cloud metadata response", result) root = content["AzureAD"] - if root is not None: - cls._cloud_cache[normalized_authority] = CloudInfo( - login_endpoint=root["LoginEndpoint"], - login_mfa_required=root["LoginMfaRequired"], - kusto_client_app_id=root["KustoClientAppId"], - kusto_client_redirect_uri=root["KustoClientRedirectUri"], - kusto_service_resource_id=root["KustoServiceResourceId"], - first_party_authority_url=root["FirstPartyAuthorityUrl"], - ) - else: - cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD - elif result.status_code == 404: - # For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data - cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD - else: - raise KustoServiceError("Kusto returned an invalid cloud metadata response", result) + if root is not None: + cls._cloud_cache[normalized_authority] = CloudInfo( + login_endpoint=root["LoginEndpoint"], + login_mfa_required=root["LoginMfaRequired"], + kusto_client_app_id=root["KustoClientAppId"], + kusto_client_redirect_uri=root["KustoClientRedirectUri"], + kusto_service_resource_id=root["KustoServiceResourceId"], + first_party_authority_url=root["FirstPartyAuthorityUrl"], + ) + else: + cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD + elif result.status_code == 404: + # For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data + cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD + else: + raise KustoServiceError("Kusto returned an invalid cloud metadata response", result) return cls._cloud_cache[normalized_authority] @classmethod @@ -110,9 +110,9 @@ def add_to_cache(cls, url: str, cloud_info: CloudInfo): with cls._cloud_cache_lock: cls._cloud_cache[cls._normalize_uri(url)] = cloud_info - @classmethod - def _normalize_uri(cls, kusto_uri): - """Extracts and returns the authority part of the URI (schema, host, port)""" - url_parts = urlparse(kusto_uri) - # Return only the scheme and netloc (which contains host and port if present) + @classmethod + def _normalize_uri(cls, kusto_uri): + """Extracts and returns the authority part of the URI (schema, host, port)""" + url_parts = urlparse(kusto_uri) + # Return only the scheme and netloc (which contains host and port if present) return f"{url_parts.scheme}://{url_parts.netloc}" diff --git a/azure-kusto-data/azure/kusto/data/_token_providers.py b/azure-kusto-data/azure/kusto/data/_token_providers.py index d0949748..69345ede 100644 --- a/azure-kusto-data/azure/kusto/data/_token_providers.py +++ b/azure-kusto-data/azure/kusto/data/_token_providers.py @@ -8,6 +8,7 @@ from threading import Lock from typing import Callable, Coroutine, List, Optional, Any +import requests from azure.core.exceptions import ClientAuthenticationError from azure.core.tracing import SpanKind from azure.core.tracing.decorator import distributed_trace @@ -92,6 +93,7 @@ class TokenProviderBase(abc.ABC): def __init__(self, is_async: bool = False): self._proxy_dict: Optional[str, str] = None + self._session: Optional[requests.Session] = None self.is_async = is_async if is_async: @@ -262,6 +264,9 @@ def _valid_token_or_throw(self, token: dict, context: str = "") -> dict: def set_proxy(self, proxy_url: str): self._proxy_dict = {"http": proxy_url, "https": proxy_url} + def set_session(self, session: requests.Session): + self._session = session + class CloudInfoTokenProvider(TokenProviderBase, abc.ABC): _cloud_info: Optional[CloudInfo] @@ -274,7 +279,7 @@ def __init__(self, kusto_uri: str, is_async: bool = False): def _init_resources(self): if self._kusto_uri is not None: - self._cloud_info = CloudSettings.get_cloud_info_for_cluster(self._kusto_uri, self._proxy_dict) + self._cloud_info = CloudSettings.get_cloud_info_for_cluster(self._kusto_uri, self._proxy_dict, self._session) resource_uri = self._cloud_info.kusto_service_resource_id if self._cloud_info.login_mfa_required: resource_uri = resource_uri.replace(".kusto.", ".kustomfa.") diff --git a/azure-kusto-data/azure/kusto/data/client_base.py b/azure-kusto-data/azure/kusto/data/client_base.py index 168d8421..12d387ff 100644 --- a/azure-kusto-data/azure/kusto/data/client_base.py +++ b/azure-kusto-data/azure/kusto/data/client_base.py @@ -2,12 +2,11 @@ import io import json import uuid -from copy import copy from datetime import timedelta from typing import Union, Optional, Any, NoReturn, ClassVar, TYPE_CHECKING from urllib.parse import urljoin -from requests import Response +from requests import Response, Session from azure.kusto.data._cloud_settings import CloudSettings from azure.kusto.data._token_providers import CloudInfoTokenProvider @@ -34,6 +33,7 @@ class _KustoClientBase(abc.ABC): _aad_helper: _AadHelper client_details: ClientDetails _endpoint_validated = False + _session: Union["aiohttp.ClientSession", "Session"] def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], is_async): self._kcsb = kcsb @@ -73,13 +73,20 @@ def set_proxy(self, proxy_url: str): self._proxy_url = proxy_url if self._aad_helper: self._aad_helper.token_provider.set_proxy(proxy_url) + if isinstance(self._session, Session): + self._aad_helper.token_provider.set_session(self._session) def validate_endpoint(self): if not self._endpoint_validated and self._aad_helper is not None: if isinstance(self._aad_helper.token_provider, CloudInfoTokenProvider): + endpoint = CloudSettings.get_cloud_info_for_cluster( + self._kusto_cluster, + self._aad_helper.token_provider._proxy_dict, + self._session if isinstance(self._session, Session) else None, + ).login_endpoint well_known_kusto_endpoints.validate_trusted_endpoint( self._kusto_cluster, - CloudSettings.get_cloud_info_for_cluster(self._kusto_cluster, self._aad_helper.token_provider._proxy_dict).login_endpoint, + endpoint, ) self._endpoint_validated = True diff --git a/azure-kusto-data/tests/test_kusto_client.py b/azure-kusto-data/tests/test_kusto_client.py index 88e61a78..ab19a712 100644 --- a/azure-kusto-data/tests/test_kusto_client.py +++ b/azure-kusto-data/tests/test_kusto_client.py @@ -181,7 +181,7 @@ def test_custom_request_id(self, mock_post, method): self._assert_sanity_query_response(response) self._assert_client_request_id(mock_post.call_args[-1], value=request_id) - @patch("requests.get", side_effect=mocked_requests_post) + @patch("requests.Session.get", side_effect=mocked_requests_post) def test_proxy_token_providers(self, mock_get, proxy_kcsb): """Test query V2.""" proxy = "https://my_proxy.sample" diff --git a/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py b/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py index 52935476..d97fc20e 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py +++ b/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py @@ -107,7 +107,7 @@ def ingest_from_dataframe( https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods :param pandas.DataFrame df: input dataframe to ingest. :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. - :param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSOn or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON. + :param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON. """ if self._is_closed: