forked from kedacore/http-add-on
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrouting.go
117 lines (100 loc) · 3.19 KB
/
routing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package middleware
import (
"context"
"fmt"
"net/http"
"net/url"
"regexp"
"github.com/kedacore/http-add-on/interceptor/handler"
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/kedacore/http-add-on/pkg/util"
)
var (
kubernetesProbeUserAgent = regexp.MustCompile(`(^|\s)kube-probe/`)
googleHCUserAgent = regexp.MustCompile(`(^|\s)GoogleHC/`)
awsELBserAgent = regexp.MustCompile(`(^|\s)ELB-HealthChecker/`)
)
type Routing struct {
routingTable routing.Table
probeHandler http.Handler
upstreamHandler http.Handler
svcCache k8s.ServiceCache
tlsEnabled bool
}
func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, svcCache k8s.ServiceCache, tlsEnabled bool) *Routing {
return &Routing{
routingTable: routingTable,
probeHandler: probeHandler,
upstreamHandler: upstreamHandler,
svcCache: svcCache,
tlsEnabled: tlsEnabled,
}
}
var _ http.Handler = (*Routing)(nil)
func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = util.RequestWithLoggerWithName(r, "RoutingMiddleware")
httpso := rm.routingTable.Route(r)
if httpso == nil {
if rm.isProbe(r) {
rm.probeHandler.ServeHTTP(w, r)
return
}
sh := handler.NewStatic(http.StatusNotFound, nil)
sh.ServeHTTP(w, r)
return
}
r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso))
stream, err := rm.streamFromHTTPSO(r.Context(), httpso)
if err != nil {
sh := handler.NewStatic(http.StatusInternalServerError, err)
sh.ServeHTTP(w, r)
return
}
r = r.WithContext(util.ContextWithStream(r.Context(), stream))
rm.upstreamHandler.ServeHTTP(w, r)
}
func (rm *Routing) getPort(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (int32, error) {
if httpso.Spec.ScaleTargetRef.Port != 0 {
return httpso.Spec.ScaleTargetRef.Port, nil
}
if httpso.Spec.ScaleTargetRef.PortName == "" {
return 0, fmt.Errorf(`must specify either "port" or "portName"`)
}
svc, err := rm.svcCache.Get(ctx, httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service)
if err != nil {
return 0, fmt.Errorf("failed to get Service: %w", err)
}
for _, port := range svc.Spec.Ports {
if port.Name == httpso.Spec.ScaleTargetRef.PortName {
return port.Port, nil
}
}
return 0, fmt.Errorf("portName %q not found in Service", httpso.Spec.ScaleTargetRef.PortName)
}
func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
port, err := rm.getPort(ctx, httpso)
if err != nil {
return nil, fmt.Errorf("failed to get port: %w", err)
}
if rm.tlsEnabled {
return url.Parse(fmt.Sprintf(
"https://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
port,
))
}
//goland:noinspection HttpUrlsUsage
return url.Parse(fmt.Sprintf(
"http://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
port,
))
}
func (rm *Routing) isProbe(r *http.Request) bool {
ua := r.UserAgent()
return kubernetesProbeUserAgent.Match([]byte(ua)) || googleHCUserAgent.Match([]byte(ua)) || awsELBserAgent.Match([]byte(ua))
}