@@ -3,17 +3,24 @@ package route
3
3
import (
4
4
"fmt"
5
5
"sort"
6
+ "time"
6
7
7
8
mapset "github.com/deckarep/golang-set"
8
9
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
10
+ xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
9
11
xds_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
12
+ xds_local_ratelimit "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/local_ratelimit/v3"
10
13
xds_matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
14
+ xds_type "github.com/envoyproxy/go-control-plane/envoy/type/v3"
15
+ "github.com/golang/protobuf/ptypes/any"
11
16
"github.com/golang/protobuf/ptypes/duration"
12
17
"github.com/golang/protobuf/ptypes/wrappers"
18
+ "github.com/pkg/errors"
19
+ "google.golang.org/protobuf/types/known/anypb"
13
20
"google.golang.org/protobuf/types/known/durationpb"
14
21
"google.golang.org/protobuf/types/known/wrapperspb"
15
22
16
- "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
23
+ policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
17
24
18
25
"github.com/openservicemesh/osm/pkg/configurator"
19
26
"github.com/openservicemesh/osm/pkg/constants"
@@ -56,6 +63,8 @@ const (
56
63
57
64
// authorityHeaderKey is the key corresponding to the HTTP Host/Authority header programmed as a header matcher in an Envoy route
58
65
authorityHeaderKey = ":authority"
66
+
67
+ httpLocalRateLimiterStatsPrefix = "http_local_rate_limiter"
59
68
)
60
69
61
70
// BuildInboundMeshRouteConfiguration constructs the Envoy constructs ([]*xds_route.RouteConfiguration) for implementing inbound and outbound routes
@@ -70,6 +79,7 @@ func BuildInboundMeshRouteConfiguration(portSpecificRouteConfigs map[int][]*traf
70
79
for _ , config := range configs {
71
80
virtualHost := buildVirtualHostStub (inboundVirtualHost , config .Name , config .Hostnames )
72
81
virtualHost .Routes = buildInboundRoutes (config .Rules , trustDomain )
82
+ applyInboundVirtualHostConfig (virtualHost , config )
73
83
routeConfig .VirtualHosts = append (routeConfig .VirtualHosts , virtualHost )
74
84
}
75
85
if featureFlags := cfg .GetFeatureFlags (); featureFlags .EnableWASMStats {
@@ -88,6 +98,100 @@ func BuildInboundMeshRouteConfiguration(portSpecificRouteConfigs map[int][]*traf
88
98
return routeConfigs
89
99
}
90
100
101
+ // applyInboundVirtualHostConfig updates the VirtualHost configuration based on the given policy
102
+ func applyInboundVirtualHostConfig (vhost * xds_route.VirtualHost , policy * trafficpolicy.InboundTrafficPolicy ) {
103
+ if vhost == nil || policy == nil {
104
+ return
105
+ }
106
+
107
+ config := make (map [string ]* any.Any )
108
+
109
+ // Apply VirtualHost level rate limiting config
110
+ if policy .RateLimit != nil && policy .RateLimit .Local != nil && policy .RateLimit .Local .HTTP != nil {
111
+ if filter , err := getLocalRateLimitFilterConfig (policy .RateLimit .Local .HTTP ); err != nil {
112
+ log .Error ().Err (err ).Msgf ("Error applying local rate limiting config for vhost %s, ignoring it" , vhost .Name )
113
+ } else {
114
+ config [envoy .HTTPLocalRateLimitFilterName ] = filter
115
+ }
116
+ }
117
+ // Add other typed filter configs below when necessary
118
+
119
+ vhost .TypedPerFilterConfig = config
120
+ }
121
+
122
+ // getLocalRateLimitFilterConfig returns the marshalled HTTP local rate limiting config for the given policy
123
+ func getLocalRateLimitFilterConfig (config * policyv1alpha1.HTTPLocalRateLimitSpec ) (* any.Any , error ) {
124
+ if config == nil {
125
+ return nil , nil
126
+ }
127
+
128
+ var fillInterval time.Duration
129
+ switch config .Unit {
130
+ case "second" :
131
+ fillInterval = time .Second
132
+ case "minute" :
133
+ fillInterval = time .Minute
134
+ case "hour" :
135
+ fillInterval = time .Hour
136
+ default :
137
+ return nil , errors .Errorf ("invalid unit %q for HTTP request rate limiting" , config .Unit )
138
+ }
139
+
140
+ rl := & xds_local_ratelimit.LocalRateLimit {
141
+ StatPrefix : httpLocalRateLimiterStatsPrefix ,
142
+ TokenBucket : & xds_type.TokenBucket {
143
+ MaxTokens : config .Requests + config .Burst ,
144
+ TokensPerFill : wrapperspb .UInt32 (config .Requests ),
145
+ FillInterval : durationpb .New (fillInterval ),
146
+ },
147
+ ResponseHeadersToAdd : getRateLimitHeaderValueOptions (config .ResponseHeadersToAdd ),
148
+ FilterEnabled : & xds_core.RuntimeFractionalPercent {
149
+ DefaultValue : & xds_type.FractionalPercent {
150
+ Numerator : 100 ,
151
+ Denominator : xds_type .FractionalPercent_HUNDRED ,
152
+ },
153
+ },
154
+ FilterEnforced : & xds_core.RuntimeFractionalPercent {
155
+ DefaultValue : & xds_type.FractionalPercent {
156
+ Numerator : 100 ,
157
+ Denominator : xds_type .FractionalPercent_HUNDRED ,
158
+ },
159
+ },
160
+ }
161
+
162
+ // Set the response status code if not specified. Envoy defaults to 429 (Too Many Requests).
163
+ if config .ResponseStatusCode > 0 {
164
+ rl .Status = & xds_type.HttpStatus {Code : xds_type .StatusCode (config .ResponseStatusCode )}
165
+ }
166
+
167
+ marshalled , err := anypb .New (rl )
168
+ if err != nil {
169
+ return nil , err
170
+ }
171
+
172
+ return marshalled , nil
173
+ }
174
+
175
+ // getRateLimitHeaderValueOptions returns a list of HeaderValueOption objects corresponding
176
+ // to the given list of rate limiting HTTPHeaderValue objects
177
+ func getRateLimitHeaderValueOptions (headerValues []policyv1alpha1.HTTPHeaderValue ) []* xds_core.HeaderValueOption {
178
+ var hvOptions []* xds_core.HeaderValueOption
179
+
180
+ for _ , hv := range headerValues {
181
+ hvOptions = append (hvOptions , & xds_core.HeaderValueOption {
182
+ Header : & xds_core.HeaderValue {
183
+ Key : hv .Name ,
184
+ Value : hv .Value ,
185
+ },
186
+ Append : & wrappers.BoolValue {
187
+ Value : false ,
188
+ },
189
+ })
190
+ }
191
+
192
+ return hvOptions
193
+ }
194
+
91
195
// BuildIngressConfiguration constructs the Envoy constructs ([]*xds_route.RouteConfiguration) for implementing ingress routes
92
196
func BuildIngressConfiguration (ingress []* trafficpolicy.InboundTrafficPolicy , trustDomain string ) * xds_route.RouteConfiguration {
93
197
if len (ingress ) == 0 {
@@ -176,7 +280,7 @@ func buildInboundRoutes(rules []*trafficpolicy.Rule, trustDomain string) []*xds_
176
280
177
281
// Create an RBAC policy derived from 'trafficpolicy.Rule'
178
282
// Each route is associated with an RBAC policy
179
- rbacPolicyForRoute , err := buildInboundRBACFilterForRule (rule , trustDomain )
283
+ rbacConfig , err := buildInboundRBACFilterForRule (rule , trustDomain )
180
284
if err != nil {
181
285
log .Error ().Err (err ).Str (errcode .Kind , errcode .GetErrCodeWithMetric (errcode .ErrBuildingRBACPolicyForRoute )).
182
286
Msgf ("Error building RBAC policy for rule [%v], skipping route addition" , rule )
@@ -186,13 +290,35 @@ func buildInboundRoutes(rules []*trafficpolicy.Rule, trustDomain string) []*xds_
186
290
// Each HTTP method corresponds to a separate route
187
291
for _ , method := range allowedMethods {
188
292
route := buildRoute (rule .Route , method )
189
- route . TypedPerFilterConfig = rbacPolicyForRoute
293
+ applyInboundRouteConfig ( route , rbacConfig , rule . Route . RateLimit )
190
294
routes = append (routes , route )
191
295
}
192
296
}
193
297
return routes
194
298
}
195
299
300
+ func applyInboundRouteConfig (route * xds_route.Route , rbacConfig * any.Any , rateLimit * policyv1alpha1.HTTPPerRouteRateLimitSpec ) {
301
+ if route == nil {
302
+ return
303
+ }
304
+
305
+ perFilterConfig := make (map [string ]* any.Any )
306
+
307
+ // Apply rate limiting config
308
+ perFilterConfig [envoy .HTTPRBACFilterName ] = rbacConfig
309
+
310
+ // Apply rate limiting config
311
+ if rateLimit != nil && rateLimit .Local != nil {
312
+ if filter , err := getLocalRateLimitFilterConfig (rateLimit .Local ); err != nil {
313
+ log .Error ().Err (err ).Msgf ("Error applying local rate limiting config for route path %s, ignoring it" , route .GetMatch ().GetPath ())
314
+ } else {
315
+ perFilterConfig [envoy .HTTPLocalRateLimitFilterName ] = filter
316
+ }
317
+ }
318
+
319
+ route .TypedPerFilterConfig = perFilterConfig
320
+ }
321
+
196
322
func buildOutboundRoutes (outRoutes []* trafficpolicy.RouteWeightedClusters ) []* xds_route.Route {
197
323
var routes []* xds_route.Route
198
324
for _ , outRoute := range outRoutes {
@@ -289,7 +415,7 @@ func buildWeightedCluster(weightedClusters mapset.Set) *xds_route.WeightedCluste
289
415
290
416
// TODO: Add validation webhook for retry policy
291
417
// Remove checks when validation webhook is implemented
292
- func buildRetryPolicy (retry * v1alpha1 .RetryPolicySpec ) * xds_route.RetryPolicy {
418
+ func buildRetryPolicy (retry * policyv1alpha1 .RetryPolicySpec ) * xds_route.RetryPolicy {
293
419
if retry == nil {
294
420
return nil
295
421
}
0 commit comments