Skip to content

Commit cfffc9c

Browse files
committed
apiserver: set proxy auth info via request header
Signed-off-by: Iceber Gu <[email protected]>
1 parent 3fd0234 commit cfffc9c

File tree

7 files changed

+218
-136
lines changed

7 files changed

+218
-136
lines changed

pkg/kubeapiserver/apiserver.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ func NewDefaultConfig() *Config {
6666
}
6767

6868
type ExtraConfig struct {
69-
AllowedProxySubresources map[schema.GroupResource]sets.Set[string]
69+
AllowPediaClusterConfigReuse bool
70+
ExtraProxyRequestHeaderPrefixes []string
71+
AllowedProxySubresources map[schema.GroupResource]sets.Set[string]
7072
}
7173

7274
type Config struct {
@@ -149,15 +151,17 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
149151
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/api/", resourceHandler)
150152
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/apis/", resourceHandler)
151153

152-
controller := NewClusterResourceController(restManager, discoveryManager, c.InformerFactory.Cluster().V1alpha2().PediaClusters())
154+
clusterInformer := c.InformerFactory.Cluster().V1alpha2().PediaClusters()
155+
_ = NewClusterResourceController(restManager, discoveryManager, clusterInformer)
156+
157+
connector := proxyrest.NewProxyConnector(clusterInformer.Lister(), c.ExtraConfig.AllowPediaClusterConfigReuse, c.ExtraConfig.ExtraProxyRequestHeaderPrefixes)
153158

154159
methodSet := sets.New("GET")
155-
for _, rest := range proxyrest.GetSubresourceRESTs(controller) {
160+
for _, rest := range proxyrest.GetSubresourceRESTs(connector) {
156161
allows := c.ExtraConfig.AllowedProxySubresources[rest.ParentGroupResource()]
157162
if allows == nil || !allows.Has(rest.Subresource()) {
158163
continue
159164
}
160-
161165
if err := restManager.preRegisterSubresource(subresource{
162166
gr: rest.ParentGroupResource(),
163167
kind: rest.ParentKind(),
@@ -178,7 +182,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
178182
}
179183
}
180184

181-
resourceHandler.proxy = proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, controller)
185+
resourceHandler.proxy = proxyrest.NewRemoteProxyREST(c.GenericConfig.Serializer, connector)
182186
return genericserver, methods, nil
183187
}
184188

pkg/kubeapiserver/clusterresource_controller.go

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
package kubeapiserver
22

33
import (
4-
"context"
5-
"errors"
6-
"net/http"
74
"reflect"
85

96
"k8s.io/apimachinery/pkg/runtime/schema"
107
"k8s.io/apimachinery/pkg/util/sets"
11-
"k8s.io/client-go/rest"
128
"k8s.io/client-go/tools/cache"
13-
"k8s.io/client-go/tools/clientcmd"
149
"k8s.io/klog/v2"
1510

1611
clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
@@ -107,66 +102,6 @@ func (c *ClusterResourceController) removeCluster(name string) {
107102
delete(c.clusterresources, name)
108103
}
109104

110-
func (c *ClusterResourceController) resolveClusterRestConfig(name string) (*rest.Config, error) {
111-
cluster, err := c.clusterLister.Get(name)
112-
if err != nil {
113-
return nil, err
114-
}
115-
116-
if len(cluster.Spec.Kubeconfig) != 0 {
117-
clientconfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Kubeconfig)
118-
if err != nil {
119-
return nil, err
120-
}
121-
return clientconfig.ClientConfig()
122-
}
123-
124-
if cluster.Spec.APIServer == "" {
125-
return nil, errors.New("Cluster APIServer Endpoint is required")
126-
}
127-
128-
if len(cluster.Spec.TokenData) == 0 &&
129-
(len(cluster.Spec.CertData) == 0 || len(cluster.Spec.KeyData) == 0) {
130-
return nil, errors.New("Cluster APIServer's Token or Cert is required")
131-
}
132-
133-
config := &rest.Config{
134-
Host: cluster.Spec.APIServer,
135-
}
136-
137-
if len(cluster.Spec.CAData) != 0 {
138-
config.TLSClientConfig.CAData = cluster.Spec.CAData
139-
} else {
140-
config.TLSClientConfig.Insecure = true
141-
}
142-
143-
if len(cluster.Spec.CertData) != 0 && len(cluster.Spec.KeyData) != 0 {
144-
config.TLSClientConfig.CertData = cluster.Spec.CertData
145-
config.TLSClientConfig.KeyData = cluster.Spec.KeyData
146-
}
147-
148-
if len(cluster.Spec.TokenData) != 0 {
149-
config.BearerToken = string(cluster.Spec.TokenData)
150-
}
151-
return config, nil
152-
}
153-
154-
func (c *ClusterResourceController) GetClusterDefaultConnection(ctx context.Context, name string) (string, http.RoundTripper, error) {
155-
config, err := c.resolveClusterRestConfig(name)
156-
if err != nil {
157-
return "", nil, err
158-
}
159-
transport, err := rest.TransportFor(config)
160-
if err != nil {
161-
return "", nil, err
162-
}
163-
return config.Host, transport, nil
164-
}
165-
166-
func (c *ClusterResourceController) GetClusterConnectionWithTLSConfig(ctx context.Context, name string) (string, http.RoundTripper, error) {
167-
return "", nil, errors.New("CetClusterConnectionWithTLSConfig not implemented")
168-
}
169-
170105
type resourceInfo struct {
171106
Namespaced bool
172107
Kind string

pkg/kubeapiserver/options.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@ import (
44
"fmt"
55
"strings"
66

7+
proxyrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/proxy"
78
"github.com/spf13/pflag"
89
"k8s.io/apimachinery/pkg/runtime/schema"
910
"k8s.io/apimachinery/pkg/util/sets"
1011
)
1112

1213
type Options struct {
13-
AllowedProxySubresources []string
14+
AllowPediaClusterConfigForProxyRequest bool
15+
AllowedProxySubresources []string
16+
ExtraProxyRequestHeaderPrefixes []string
1417
}
1518

1619
func NewOptions() *Options {
17-
return &Options{}
20+
return &Options{
21+
ExtraProxyRequestHeaderPrefixes: []string{proxyrest.DefaultProxyRequestHeaderPrefix},
22+
}
1823
}
1924

2025
func (o *Options) AddFlags(fs *pflag.FlagSet) {
@@ -33,6 +38,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
3338
"List of subresources that support proxying requests to the specified cluster, formatted as '[resource/subresource],[subresource],...'. "+
3439
fmt.Sprintf("Supported proxy subresources include %q", strings.Join(resources, ",")),
3540
)
41+
42+
fs.BoolVar(&o.AllowPediaClusterConfigForProxyRequest, "allow-pediacluster-config-for-proxy-request", o.AllowPediaClusterConfigForProxyRequest, ""+
43+
"Allow proxy requests to use the cluster configuration from PediaCluster when authentication information cannot be obtained from the header.",
44+
)
3645
}
3746

3847
var supportedProxyCoreSubresources = map[string][]string{
@@ -75,5 +84,8 @@ func (o *Options) Config() (*ExtraConfig, error) {
7584
return nil, fmt.Errorf("--allowed-proxy-subresources: unsupported subresources or invalid format %q", subresource)
7685
}
7786
}
78-
return &ExtraConfig{AllowedProxySubresources: subresources}, nil
87+
return &ExtraConfig{
88+
AllowPediaClusterConfigReuse: o.AllowPediaClusterConfigForProxyRequest,
89+
AllowedProxySubresources: subresources,
90+
}, nil
7991
}

pkg/kubeapiserver/resourcerest/proxy/proxy.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ import (
1919
)
2020

2121
type ClusterConnectionGetter interface {
22-
GetClusterDefaultConnection(ctx context.Context, cluster string) (string, http.RoundTripper, error)
23-
GetClusterConnectionWithTLSConfig(ctx context.Context, cluster string) (string, http.RoundTripper, error)
22+
GetClusterConnection(ctx context.Context, cluster string, req *http.Request) (string, http.RoundTripper, error)
2423
}
2524

2625
type RemoteProxyREST struct {
@@ -33,7 +32,7 @@ func NewRemoteProxyREST(serializer runtime.NegotiatedSerializer, connGetter Clus
3332
}
3433

3534
func (r *RemoteProxyREST) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
36-
handler, err := proxyConn(req.Context(), r.connGetter, false, r, nil)
35+
handler, err := proxyConn(req.Context(), r.connGetter, false, false, r, nil)
3736
if err != nil {
3837
r.Error(rw, req, err)
3938
}
@@ -55,26 +54,28 @@ func proxyConn(ctx context.Context, connGetter ClusterConnectionGetter, upgradeR
5554
return nil, errors.New("missing RequestInfo")
5655
}
5756

58-
// TODO(iceber): need disconnect when the cluster authentication information changes
59-
endpoint, transport, err := connGetter.GetClusterDefaultConnection(ctx, clusterName)
60-
if err != nil {
61-
return nil, err
62-
}
57+
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
58+
// TODO(iceber): need disconnect when the cluster authentication information changes
59+
endpoint, transport, err := connGetter.GetClusterConnection(ctx, clusterName, req)
60+
if err != nil {
61+
responder.Error(rw, req, err)
62+
return
63+
}
6364

64-
target, err := url.ParseRequestURI(endpoint + requestInfo.Path)
65-
if err != nil {
66-
return nil, err
67-
}
68-
target.RawQuery = request.RequestQueryFrom(ctx).Encode()
65+
target, err := url.ParseRequestURI(endpoint + requestInfo.Path)
66+
if err != nil {
67+
responder.Error(rw, req, err)
68+
return
69+
}
70+
target.RawQuery = request.RequestQueryFrom(ctx).Encode()
6971

70-
proxy := proxy.NewUpgradeAwareHandler(target, transport, false, upgradeRequired, responder)
71-
proxy.UseLocationHost = true
72+
proxy := proxy.NewUpgradeAwareHandler(target, transport, false, upgradeRequired, responder)
73+
proxy.UseLocationHost = true
7274

73-
var handler http.Handler = proxy
74-
if wrapProxy != nil {
75-
handler = wrapProxy(proxy)
76-
}
77-
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
75+
var handler http.Handler = proxy
76+
if wrapProxy != nil {
77+
handler = wrapProxy(proxy)
78+
}
7879
r := req.WithContext(req.Context())
7980
r.Header = utilnet.CloneHeader(req.Header)
8081
if auditID, _ := audit.AuditIDFrom(ctx); auditID != "" {
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"errors"
7+
"net/http"
8+
"net/url"
9+
"strings"
10+
11+
"k8s.io/client-go/rest"
12+
13+
clusterlister "github.com/clusterpedia-io/clusterpedia/pkg/generated/listers/cluster/v1alpha2"
14+
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
15+
)
16+
17+
const DefaultProxyRequestHeaderPrefix = "X-Clusterpedia-Proxy-"
18+
19+
type Connector struct {
20+
allowConfigReuse bool
21+
extraHeaderPrefixes []string
22+
clusterLister clusterlister.PediaClusterLister
23+
}
24+
25+
func NewProxyConnector(clusterLister clusterlister.PediaClusterLister, allowPediaClusterConfigReuse bool, extraHeaderPrefixes []string) ClusterConnectionGetter {
26+
if len(extraHeaderPrefixes) == 0 {
27+
extraHeaderPrefixes = []string{DefaultProxyRequestHeaderPrefix}
28+
}
29+
return &Connector{
30+
allowConfigReuse: allowPediaClusterConfigReuse,
31+
extraHeaderPrefixes: extraHeaderPrefixes,
32+
clusterLister: clusterLister,
33+
}
34+
}
35+
36+
func (c *Connector) GetClusterConnection(ctx context.Context, name string, req *http.Request) (string, http.RoundTripper, error) {
37+
cluster, err := c.clusterLister.Get(name)
38+
if err != nil {
39+
return "", nil, err
40+
}
41+
config := &rest.Config{}
42+
if cluster.Status.APIServer != "" {
43+
config.Host = cluster.Status.APIServer
44+
} else if cluster.Spec.APIServer != "" {
45+
config.Host = cluster.Spec.APIServer
46+
} else {
47+
return "", nil, errors.New(".Spec.APIServer and .Status.APIServer are empty")
48+
}
49+
config.TLSClientConfig.Insecure = true
50+
51+
var authInHeader bool
52+
extra := map[string][]string{}
53+
54+
headers := req.Header.Clone()
55+
for _, prefix := range c.extraHeaderPrefixes {
56+
for headerName, vv := range headers {
57+
if !(len(headerName) >= len(prefix) && strings.EqualFold(headerName[:len(prefix)], prefix)) {
58+
continue
59+
}
60+
61+
extraKey := unescapeExtraKey(strings.ToLower(headerName[len(prefix):]))
62+
extra[extraKey] = append(extra[extraKey], vv...)
63+
req.Header.Del(headerName)
64+
}
65+
}
66+
67+
for key, vals := range extra {
68+
switch key {
69+
case "ca":
70+
authInHeader = true
71+
if len(vals) > 0 {
72+
config.TLSClientConfig.CAData, err = base64.StdEncoding.DecodeString(vals[0])
73+
if err != nil {
74+
return "", nil, err
75+
}
76+
config.TLSClientConfig.Insecure = false
77+
}
78+
case "token":
79+
authInHeader = true
80+
if len(vals) > 0 {
81+
config.BearerToken = vals[0]
82+
}
83+
case "client-cert":
84+
authInHeader = true
85+
if len(vals) > 0 {
86+
config.TLSClientConfig.CertData, err = base64.StdEncoding.DecodeString(vals[0])
87+
if err != nil {
88+
return "", nil, err
89+
}
90+
}
91+
case "client-key":
92+
authInHeader = true
93+
if len(vals) > 0 {
94+
config.TLSClientConfig.KeyData, err = base64.StdEncoding.DecodeString(vals[0])
95+
if err != nil {
96+
return "", nil, err
97+
}
98+
}
99+
}
100+
}
101+
102+
if !authInHeader && c.allowConfigReuse {
103+
config, err = utils.BuildClusterRestConfig(cluster)
104+
if err != nil {
105+
return "", nil, err
106+
}
107+
}
108+
109+
transport, err := rest.TransportFor(config)
110+
if err != nil {
111+
return "", nil, err
112+
}
113+
return config.Host, transport, nil
114+
}
115+
116+
func unescapeExtraKey(encodedKey string) string {
117+
key, err := url.PathUnescape(encodedKey) // Decode %-encoded bytes.
118+
if err != nil {
119+
return encodedKey // Always record extra strings, even if malformed/unencoded.
120+
}
121+
return key
122+
}

0 commit comments

Comments
 (0)