|
1 | 1 | """Test digest authentication middleware for aiohttp client."""
|
2 | 2 |
|
3 | 3 | import io
|
| 4 | +import re |
4 | 5 | from hashlib import md5, sha1
|
5 | 6 | from typing import Generator, Literal, Union
|
6 | 7 | from unittest import mock
|
@@ -211,6 +212,48 @@ async def test_encode_unsupported_algorithm(
|
211 | 212 | await digest_auth_mw._encode("GET", URL("http://example.com/resource"), b"")
|
212 | 213 |
|
213 | 214 |
|
| 215 | +@pytest.mark.parametrize("algorithm", ["MD5", "MD5-SESS", "SHA-256"]) |
| 216 | +async def test_encode_algorithm_case_preservation_uppercase( |
| 217 | + digest_auth_mw: DigestAuthMiddleware, |
| 218 | + qop_challenge: DigestAuthChallenge, |
| 219 | + algorithm: str, |
| 220 | +) -> None: |
| 221 | + """Test that uppercase algorithm case is preserved in the response header.""" |
| 222 | + # Create a challenge with the specific algorithm case |
| 223 | + challenge = qop_challenge.copy() |
| 224 | + challenge["algorithm"] = algorithm |
| 225 | + digest_auth_mw._challenge = challenge |
| 226 | + |
| 227 | + header = await digest_auth_mw._encode( |
| 228 | + "GET", URL("http://example.com/resource"), b"" |
| 229 | + ) |
| 230 | + |
| 231 | + # The algorithm in the response should match the exact case from the challenge |
| 232 | + assert f"algorithm={algorithm}" in header |
| 233 | + |
| 234 | + |
| 235 | +@pytest.mark.parametrize("algorithm", ["md5", "MD5-sess", "sha-256"]) |
| 236 | +async def test_encode_algorithm_case_preservation_lowercase( |
| 237 | + digest_auth_mw: DigestAuthMiddleware, |
| 238 | + qop_challenge: DigestAuthChallenge, |
| 239 | + algorithm: str, |
| 240 | +) -> None: |
| 241 | + """Test that lowercase/mixed-case algorithm is preserved in the response header.""" |
| 242 | + # Create a challenge with the specific algorithm case |
| 243 | + challenge = qop_challenge.copy() |
| 244 | + challenge["algorithm"] = algorithm |
| 245 | + digest_auth_mw._challenge = challenge |
| 246 | + |
| 247 | + header = await digest_auth_mw._encode( |
| 248 | + "GET", URL("http://example.com/resource"), b"" |
| 249 | + ) |
| 250 | + |
| 251 | + # The algorithm in the response should match the exact case from the challenge |
| 252 | + assert f"algorithm={algorithm}" in header |
| 253 | + # Also verify it's not the uppercase version |
| 254 | + assert f"algorithm={algorithm.upper()}" not in header |
| 255 | + |
| 256 | + |
214 | 257 | async def test_invalid_qop_rejected(
|
215 | 258 | digest_auth_mw: DigestAuthMiddleware, basic_challenge: DigestAuthChallenge
|
216 | 259 | ) -> None:
|
@@ -1231,3 +1274,55 @@ def test_in_protection_space_multiple_spaces(
|
1231 | 1274 | digest_auth_mw._in_protection_space(URL("http://example.com/secure")) is False
|
1232 | 1275 | )
|
1233 | 1276 | assert digest_auth_mw._in_protection_space(URL("http://example.com/other")) is False
|
| 1277 | + |
| 1278 | + |
| 1279 | +async def test_case_sensitive_algorithm_server( |
| 1280 | + aiohttp_server: AiohttpServer, |
| 1281 | +) -> None: |
| 1282 | + """Test authentication with a server that requires exact algorithm case matching. |
| 1283 | +
|
| 1284 | + This simulates servers like Prusa printers that expect the algorithm |
| 1285 | + to be returned with the exact same case as sent in the challenge. |
| 1286 | + """ |
| 1287 | + digest_auth_mw = DigestAuthMiddleware("testuser", "testpass") |
| 1288 | + request_count = 0 |
| 1289 | + auth_algorithms: list[str] = [] |
| 1290 | + |
| 1291 | + async def handler(request: Request) -> Response: |
| 1292 | + nonlocal request_count |
| 1293 | + request_count += 1 |
| 1294 | + |
| 1295 | + if not (auth_header := request.headers.get(hdrs.AUTHORIZATION)): |
| 1296 | + # Send challenge with lowercase-sess algorithm (like Prusa) |
| 1297 | + challenge = 'Digest realm="Administrator", nonce="test123", qop="auth", algorithm="MD5-sess", opaque="xyz123"' |
| 1298 | + return Response( |
| 1299 | + status=401, |
| 1300 | + headers={"WWW-Authenticate": challenge}, |
| 1301 | + text="Unauthorized", |
| 1302 | + ) |
| 1303 | + |
| 1304 | + # Extract algorithm from auth response |
| 1305 | + algo_match = re.search(r"algorithm=([^,\s]+)", auth_header) |
| 1306 | + assert algo_match is not None |
| 1307 | + auth_algorithms.append(algo_match.group(1)) |
| 1308 | + |
| 1309 | + # Case-sensitive server: only accept exact case match |
| 1310 | + assert "algorithm=MD5-sess" in auth_header |
| 1311 | + return Response(text="Success") |
| 1312 | + |
| 1313 | + app = Application() |
| 1314 | + app.router.add_get("/api/test", handler) |
| 1315 | + server = await aiohttp_server(app) |
| 1316 | + |
| 1317 | + async with ( |
| 1318 | + ClientSession(middlewares=(digest_auth_mw,)) as session, |
| 1319 | + session.get(server.make_url("/api/test")) as resp, |
| 1320 | + ): |
| 1321 | + assert resp.status == 200 |
| 1322 | + text = await resp.text() |
| 1323 | + assert text == "Success" |
| 1324 | + |
| 1325 | + # Verify the middleware preserved the exact algorithm case |
| 1326 | + assert request_count == 2 # Initial 401 + successful retry |
| 1327 | + assert len(auth_algorithms) == 1 |
| 1328 | + assert auth_algorithms[0] == "MD5-sess" # Not "MD5-SESS" |
0 commit comments