2
2
import getpass
3
3
import logging
4
4
from typing import TYPE_CHECKING , Callable , Optional , Type , cast
5
+ from urllib .parse import urlparse
6
+
7
+ import requests
5
8
6
9
# keyring has an indirect dependency on PyCA cryptography, which has no
7
10
# pre-built wheels for ppc64le and s390x, see #1158.
@@ -28,9 +31,12 @@ def __init__(
28
31
29
32
30
33
class Resolver :
31
- def __init__ (self , config : utils .RepositoryConfig , input : CredentialInput ) -> None :
34
+ def __init__ (
35
+ self , config : utils .RepositoryConfig , input : CredentialInput , oidc : bool = False
36
+ ) -> None :
32
37
self .config = config
33
38
self .input = input
39
+ self .oidc = oidc
34
40
35
41
@classmethod
36
42
def choose (cls , interactive : bool ) -> Type ["Resolver" ]:
@@ -53,13 +59,41 @@ def username(self) -> Optional[str]:
53
59
@property
54
60
@functools .lru_cache ()
55
61
def password (self ) -> Optional [str ]:
62
+ if self .oidc :
63
+ # Trusted publishing (OpenID Connect): get one token from the CI
64
+ # system, and exchange that for a PyPI token.
65
+ from id import detect_credential
66
+
67
+ repository_domain = urlparse (self .system ).netloc
68
+ audience = self ._oidc_audience (repository_domain )
69
+ oidc_token = detect_credential (audience )
70
+
71
+ token_exchange_url = f'https://{ repository_domain } /_/oidc/mint-token'
72
+
73
+ mint_token_resp = requests .post (
74
+ token_exchange_url ,
75
+ json = {'token' : oidc_token },
76
+ timeout = 5 , # S113 wants a timeout
77
+ )
78
+ mint_token_resp .raise_for_status ()
79
+ return mint_token_resp .json ()['token' ]
80
+
56
81
return utils .get_userpass_value (
57
82
self .input .password ,
58
83
self .config ,
59
84
key = "password" ,
60
85
prompt_strategy = self .password_from_keyring_or_prompt ,
61
86
)
62
87
88
+ @staticmethod
89
+ def _oidc_audience (repository_domain ):
90
+ # Indices are expected to support `https://{domain}/_/oidc/audience`,
91
+ # which tells OIDC exchange clients which audience to use.
92
+ audience_url = f'https://{ repository_domain } /_/oidc/audience'
93
+ resp = requests .get (audience_url , timeout = 5 )
94
+ resp .raise_for_status ()
95
+ return resp .json ()['audience' ]
96
+
63
97
@property
64
98
def system (self ) -> Optional [str ]:
65
99
return self .config ["repository" ]
0 commit comments