diff --git a/CHANGES/11352.bugfix.rst b/CHANGES/11352.bugfix.rst new file mode 100644 index 00000000000..3ccc8a58d2f --- /dev/null +++ b/CHANGES/11352.bugfix.rst @@ -0,0 +1,2 @@ +Fixed :class:`~aiohttp.DigestAuthMiddleware` to preserve the algorithm case from the server's challenge in the authorization response. This improves compatibility with servers that perform case-sensitive algorithm matching (e.g., servers expecting ``algorithm=MD5-sess`` instead of ``algorithm=MD5-SESS``) +-- by :user:`bdraco`. diff --git a/aiohttp/client_middleware_digest_auth.py b/aiohttp/client_middleware_digest_auth.py index 35f462f180b..c1ed7ca0fdd 100644 --- a/aiohttp/client_middleware_digest_auth.py +++ b/aiohttp/client_middleware_digest_auth.py @@ -245,7 +245,9 @@ async def _encode( ) qop_raw = challenge.get("qop", "") - algorithm = challenge.get("algorithm", "MD5").upper() + # Preserve original algorithm case for response while using uppercase for processing + algorithm_original = challenge.get("algorithm", "MD5") + algorithm = algorithm_original.upper() opaque = challenge.get("opaque", "") # Convert string values to bytes once @@ -342,7 +344,7 @@ def KD(s: bytes, d: bytes) -> bytes: "nonce": escape_quotes(nonce), "uri": path, "response": response_digest.decode(), - "algorithm": algorithm, + "algorithm": algorithm_original, } # Optional fields diff --git a/tests/test_client_middleware_digest_auth.py b/tests/test_client_middleware_digest_auth.py index 16959aecdf4..2059bfea337 100644 --- a/tests/test_client_middleware_digest_auth.py +++ b/tests/test_client_middleware_digest_auth.py @@ -1,6 +1,7 @@ """Test digest authentication middleware for aiohttp client.""" import io +import re from hashlib import md5, sha1 from typing import Generator, Literal, Union from unittest import mock @@ -211,6 +212,48 @@ async def test_encode_unsupported_algorithm( await digest_auth_mw._encode("GET", URL("http://example.com/resource"), b"") +@pytest.mark.parametrize("algorithm", ["MD5", "MD5-SESS", "SHA-256"]) +async def test_encode_algorithm_case_preservation_uppercase( + digest_auth_mw: DigestAuthMiddleware, + qop_challenge: DigestAuthChallenge, + algorithm: str, +) -> None: + """Test that uppercase algorithm case is preserved in the response header.""" + # Create a challenge with the specific algorithm case + challenge = qop_challenge.copy() + challenge["algorithm"] = algorithm + digest_auth_mw._challenge = challenge + + header = await digest_auth_mw._encode( + "GET", URL("http://example.com/resource"), b"" + ) + + # The algorithm in the response should match the exact case from the challenge + assert f"algorithm={algorithm}" in header + + +@pytest.mark.parametrize("algorithm", ["md5", "MD5-sess", "sha-256"]) +async def test_encode_algorithm_case_preservation_lowercase( + digest_auth_mw: DigestAuthMiddleware, + qop_challenge: DigestAuthChallenge, + algorithm: str, +) -> None: + """Test that lowercase/mixed-case algorithm is preserved in the response header.""" + # Create a challenge with the specific algorithm case + challenge = qop_challenge.copy() + challenge["algorithm"] = algorithm + digest_auth_mw._challenge = challenge + + header = await digest_auth_mw._encode( + "GET", URL("http://example.com/resource"), b"" + ) + + # The algorithm in the response should match the exact case from the challenge + assert f"algorithm={algorithm}" in header + # Also verify it's not the uppercase version + assert f"algorithm={algorithm.upper()}" not in header + + async def test_invalid_qop_rejected( digest_auth_mw: DigestAuthMiddleware, basic_challenge: DigestAuthChallenge ) -> None: @@ -1231,3 +1274,55 @@ def test_in_protection_space_multiple_spaces( digest_auth_mw._in_protection_space(URL("http://example.com/secure")) is False ) assert digest_auth_mw._in_protection_space(URL("http://example.com/other")) is False + + +async def test_case_sensitive_algorithm_server( + aiohttp_server: AiohttpServer, +) -> None: + """Test authentication with a server that requires exact algorithm case matching. + + This simulates servers like Prusa printers that expect the algorithm + to be returned with the exact same case as sent in the challenge. + """ + digest_auth_mw = DigestAuthMiddleware("testuser", "testpass") + request_count = 0 + auth_algorithms: list[str] = [] + + async def handler(request: Request) -> Response: + nonlocal request_count + request_count += 1 + + if not (auth_header := request.headers.get(hdrs.AUTHORIZATION)): + # Send challenge with lowercase-sess algorithm (like Prusa) + challenge = 'Digest realm="Administrator", nonce="test123", qop="auth", algorithm="MD5-sess", opaque="xyz123"' + return Response( + status=401, + headers={"WWW-Authenticate": challenge}, + text="Unauthorized", + ) + + # Extract algorithm from auth response + algo_match = re.search(r"algorithm=([^,\s]+)", auth_header) + assert algo_match is not None + auth_algorithms.append(algo_match.group(1)) + + # Case-sensitive server: only accept exact case match + assert "algorithm=MD5-sess" in auth_header + return Response(text="Success") + + app = Application() + app.router.add_get("/api/test", handler) + server = await aiohttp_server(app) + + async with ( + ClientSession(middlewares=(digest_auth_mw,)) as session, + session.get(server.make_url("/api/test")) as resp, + ): + assert resp.status == 200 + text = await resp.text() + assert text == "Success" + + # Verify the middleware preserved the exact algorithm case + assert request_count == 2 # Initial 401 + successful retry + assert len(auth_algorithms) == 1 + assert auth_algorithms[0] == "MD5-sess" # Not "MD5-SESS"