Skip to content

Fix algorithm case preservation in DigestAuthMiddleware #11352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/11352.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed :class:`~aiohttp.DigestAuthMiddleware` to preserve the algorithm case from the server's challenge in the authorization response. This improves compatibility with servers that perform case-sensitive algorithm matching (e.g., servers expecting ``algorithm=MD5-sess`` instead of ``algorithm=MD5-SESS``)
-- by :user:`bdraco`.
6 changes: 4 additions & 2 deletions aiohttp/client_middleware_digest_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ async def _encode(
)

qop_raw = challenge.get("qop", "")
algorithm = challenge.get("algorithm", "MD5").upper()
# Preserve original algorithm case for response while using uppercase for processing
algorithm_original = challenge.get("algorithm", "MD5")
algorithm = algorithm_original.upper()
opaque = challenge.get("opaque", "")

# Convert string values to bytes once
Expand Down Expand Up @@ -342,7 +344,7 @@ def KD(s: bytes, d: bytes) -> bytes:
"nonce": escape_quotes(nonce),
"uri": path,
"response": response_digest.decode(),
"algorithm": algorithm,
"algorithm": algorithm_original,
}

# Optional fields
Expand Down
95 changes: 95 additions & 0 deletions tests/test_client_middleware_digest_auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test digest authentication middleware for aiohttp client."""

import io
import re
from hashlib import md5, sha1
from typing import Generator, Literal, Union
from unittest import mock
Expand Down Expand Up @@ -211,6 +212,48 @@ async def test_encode_unsupported_algorithm(
await digest_auth_mw._encode("GET", URL("http://example.com/resource"), b"")


@pytest.mark.parametrize("algorithm", ["MD5", "MD5-SESS", "SHA-256"])
async def test_encode_algorithm_case_preservation_uppercase(
digest_auth_mw: DigestAuthMiddleware,
qop_challenge: DigestAuthChallenge,
algorithm: str,
) -> None:
"""Test that uppercase algorithm case is preserved in the response header."""
# Create a challenge with the specific algorithm case
challenge = qop_challenge.copy()
challenge["algorithm"] = algorithm
digest_auth_mw._challenge = challenge

header = await digest_auth_mw._encode(
"GET", URL("http://example.com/resource"), b""
)

# The algorithm in the response should match the exact case from the challenge
assert f"algorithm={algorithm}" in header


@pytest.mark.parametrize("algorithm", ["md5", "MD5-sess", "sha-256"])
async def test_encode_algorithm_case_preservation_lowercase(
digest_auth_mw: DigestAuthMiddleware,
qop_challenge: DigestAuthChallenge,
algorithm: str,
) -> None:
"""Test that lowercase/mixed-case algorithm is preserved in the response header."""
# Create a challenge with the specific algorithm case
challenge = qop_challenge.copy()
challenge["algorithm"] = algorithm
digest_auth_mw._challenge = challenge

header = await digest_auth_mw._encode(
"GET", URL("http://example.com/resource"), b""
)

# The algorithm in the response should match the exact case from the challenge
assert f"algorithm={algorithm}" in header
# Also verify it's not the uppercase version
assert f"algorithm={algorithm.upper()}" not in header


async def test_invalid_qop_rejected(
digest_auth_mw: DigestAuthMiddleware, basic_challenge: DigestAuthChallenge
) -> None:
Expand Down Expand Up @@ -1231,3 +1274,55 @@ def test_in_protection_space_multiple_spaces(
digest_auth_mw._in_protection_space(URL("http://example.com/secure")) is False
)
assert digest_auth_mw._in_protection_space(URL("http://example.com/other")) is False


async def test_case_sensitive_algorithm_server(
aiohttp_server: AiohttpServer,
) -> None:
"""Test authentication with a server that requires exact algorithm case matching.

This simulates servers like Prusa printers that expect the algorithm
to be returned with the exact same case as sent in the challenge.
"""
digest_auth_mw = DigestAuthMiddleware("testuser", "testpass")
request_count = 0
auth_algorithms: list[str] = []

async def handler(request: Request) -> Response:
nonlocal request_count
request_count += 1

if not (auth_header := request.headers.get(hdrs.AUTHORIZATION)):
# Send challenge with lowercase-sess algorithm (like Prusa)
challenge = 'Digest realm="Administrator", nonce="test123", qop="auth", algorithm="MD5-sess", opaque="xyz123"'
return Response(
status=401,
headers={"WWW-Authenticate": challenge},
text="Unauthorized",
)

# Extract algorithm from auth response
algo_match = re.search(r"algorithm=([^,\s]+)", auth_header)
assert algo_match is not None
auth_algorithms.append(algo_match.group(1))

# Case-sensitive server: only accept exact case match
assert "algorithm=MD5-sess" in auth_header
return Response(text="Success")

app = Application()
app.router.add_get("/api/test", handler)
server = await aiohttp_server(app)

async with (
ClientSession(middlewares=(digest_auth_mw,)) as session,
session.get(server.make_url("/api/test")) as resp,
):
assert resp.status == 200
text = await resp.text()
assert text == "Success"

# Verify the middleware preserved the exact algorithm case
assert request_count == 2 # Initial 401 + successful retry
assert len(auth_algorithms) == 1
assert auth_algorithms[0] == "MD5-sess" # Not "MD5-SESS"
Loading