@@ -6,10 +6,13 @@ import (
6
6
mapset "github.com/deckarep/golang-set"
7
7
access "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/access/v1alpha3"
8
8
9
+ policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
10
+
9
11
"github.com/openservicemesh/osm/pkg/constants"
10
12
"github.com/openservicemesh/osm/pkg/errcode"
11
13
"github.com/openservicemesh/osm/pkg/identity"
12
14
"github.com/openservicemesh/osm/pkg/k8s"
15
+ "github.com/openservicemesh/osm/pkg/policy"
13
16
"github.com/openservicemesh/osm/pkg/service"
14
17
"github.com/openservicemesh/osm/pkg/smi"
15
18
"github.com/openservicemesh/osm/pkg/trafficpolicy"
@@ -44,6 +47,8 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
44
47
45
48
// Build configurations per upstream service
46
49
for _ , upstreamSvc := range allUpstreamServices {
50
+ upstreamSvc := upstreamSvc // To prevent loop variable memory aliasing in for loop
51
+
47
52
// ---
48
53
// Create local cluster configs for this upstram service
49
54
clusterConfigForSvc := & trafficpolicy.MeshClusterConfig {
@@ -64,6 +69,13 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
64
69
DestinationPort : int (upstreamSvc .TargetPort ),
65
70
DestinationProtocol : upstreamSvc .Protocol ,
66
71
}
72
+
73
+ upstreamTrafficSetting := mc .policyController .GetUpstreamTrafficSetting (
74
+ policy.UpstreamTrafficSettingGetOpt {MeshService : & upstreamSvc })
75
+ if upstreamTrafficSetting != nil {
76
+ trafficMatchForUpstreamSvc .RateLimit = upstreamTrafficSetting .Spec .RateLimit
77
+ }
78
+
67
79
trafficMatches = append (trafficMatches , trafficMatchForUpstreamSvc )
68
80
69
81
// Build the HTTP route configs for this service and port combination.
@@ -77,7 +89,7 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
77
89
// The routes are derived from SMI TrafficTarget and TrafficSplit policies in SMI mode,
78
90
// and are wildcarded in permissive mode. The downstreams that can access this upstream
79
91
// on the configured routes is also determined based on the traffic policy mode.
80
- inboundTrafficPolicies := mc .getInboundTrafficPoliciesForUpstream (upstreamSvc , permissiveMode , trafficTargets )
92
+ inboundTrafficPolicies := mc .getInboundTrafficPoliciesForUpstream (upstreamSvc , permissiveMode , trafficTargets , upstreamTrafficSetting )
81
93
routeConfigPerPort [int (upstreamSvc .TargetPort )] = append (routeConfigPerPort [int (upstreamSvc .TargetPort )], inboundTrafficPolicies )
82
94
}
83
95
@@ -88,35 +100,38 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
88
100
}
89
101
}
90
102
91
- func (mc * MeshCatalog ) getInboundTrafficPoliciesForUpstream (upstreamSvc service.MeshService , permissiveMode bool , trafficTargets []* access.TrafficTarget ) * trafficpolicy.InboundTrafficPolicy {
103
+ func (mc * MeshCatalog ) getInboundTrafficPoliciesForUpstream (upstreamSvc service.MeshService , permissiveMode bool ,
104
+ trafficTargets []* access.TrafficTarget , upstreamTrafficSetting * policyv1alpha1.UpstreamTrafficSetting ) * trafficpolicy.InboundTrafficPolicy {
92
105
var inboundPolicyForUpstreamSvc * trafficpolicy.InboundTrafficPolicy
93
106
94
107
if permissiveMode {
95
108
// Add a wildcard HTTP route that allows any downstream client to access the upstream service
96
109
hostnames := k8s .GetHostnamesForService (upstreamSvc , true /* local namespace FQDN should always be allowed for inbound routes*/ )
97
- inboundPolicyForUpstreamSvc = trafficpolicy .NewInboundTrafficPolicy (upstreamSvc .FQDN (), hostnames )
110
+ inboundPolicyForUpstreamSvc = trafficpolicy .NewInboundTrafficPolicy (upstreamSvc .FQDN (), hostnames , upstreamTrafficSetting )
98
111
localCluster := service.WeightedCluster {
99
112
ClusterName : service .ClusterName (upstreamSvc .EnvoyLocalClusterName ()),
100
113
Weight : constants .ClusterWeightAcceptAll ,
101
114
}
102
115
// Only a single rule for permissive mode.
103
116
inboundPolicyForUpstreamSvc .Rules = []* trafficpolicy.Rule {
104
117
{
105
- Route : * trafficpolicy .NewRouteWeightedCluster (trafficpolicy .WildCardRouteMatch , []service.WeightedCluster {localCluster }),
118
+ Route : * trafficpolicy .NewRouteWeightedCluster (trafficpolicy .WildCardRouteMatch , []service.WeightedCluster {localCluster }, upstreamTrafficSetting ),
106
119
AllowedServiceIdentities : mapset .NewSetWith (identity .WildcardServiceIdentity ),
107
120
},
108
121
}
109
122
} else {
110
123
// Build the HTTP routes from SMI TrafficTarget and HTTPRouteGroup configurations
111
- inboundPolicyForUpstreamSvc = mc .buildInboundHTTPPolicyFromTrafficTarget (upstreamSvc , trafficTargets )
124
+ inboundPolicyForUpstreamSvc = mc .buildInboundHTTPPolicyFromTrafficTarget (upstreamSvc , trafficTargets , upstreamTrafficSetting )
112
125
}
113
126
114
127
return inboundPolicyForUpstreamSvc
115
128
}
116
129
117
- func (mc * MeshCatalog ) buildInboundHTTPPolicyFromTrafficTarget (upstreamSvc service.MeshService , trafficTargets []* access.TrafficTarget ) * trafficpolicy.InboundTrafficPolicy {
130
+ func (mc * MeshCatalog ) buildInboundHTTPPolicyFromTrafficTarget (upstreamSvc service.MeshService , trafficTargets []* access.TrafficTarget ,
131
+ upstreamTrafficSetting * policyv1alpha1.UpstreamTrafficSetting ) * trafficpolicy.InboundTrafficPolicy {
118
132
hostnames := k8s .GetHostnamesForService (upstreamSvc , true /* local namespace FQDN should always be allowed for inbound routes*/ )
119
- inboundPolicy := trafficpolicy .NewInboundTrafficPolicy (upstreamSvc .FQDN (), hostnames )
133
+ inboundPolicy := trafficpolicy .NewInboundTrafficPolicy (upstreamSvc .FQDN (), hostnames , upstreamTrafficSetting )
134
+
120
135
localCluster := service.WeightedCluster {
121
136
ClusterName : service .ClusterName (upstreamSvc .EnvoyLocalClusterName ()),
122
137
Weight : constants .ClusterWeightAcceptAll ,
@@ -125,7 +140,7 @@ func (mc *MeshCatalog) buildInboundHTTPPolicyFromTrafficTarget(upstreamSvc servi
125
140
var routingRules []* trafficpolicy.Rule
126
141
// From each TrafficTarget and HTTPRouteGroup configuration associated with this service, build routes for it.
127
142
for _ , trafficTarget := range trafficTargets {
128
- rules := mc .getRoutingRulesFromTrafficTarget (* trafficTarget , localCluster )
143
+ rules := mc .getRoutingRulesFromTrafficTarget (* trafficTarget , localCluster , upstreamTrafficSetting )
129
144
// Multiple TrafficTarget objects can reference the same route, in which case such routes
130
145
// need to be merged to create a single route that includes all the downstream client identities
131
146
// this route is authorized for.
@@ -136,7 +151,8 @@ func (mc *MeshCatalog) buildInboundHTTPPolicyFromTrafficTarget(upstreamSvc servi
136
151
return inboundPolicy
137
152
}
138
153
139
- func (mc * MeshCatalog ) getRoutingRulesFromTrafficTarget (trafficTarget access.TrafficTarget , routingCluster service.WeightedCluster ) []* trafficpolicy.Rule {
154
+ func (mc * MeshCatalog ) getRoutingRulesFromTrafficTarget (trafficTarget access.TrafficTarget , routingCluster service.WeightedCluster ,
155
+ upstreamTrafficSetting * policyv1alpha1.UpstreamTrafficSetting ) []* trafficpolicy.Rule {
140
156
// Compute the HTTP route matches associated with the given TrafficTarget object
141
157
httpRouteMatches , err := mc .routesFromRules (trafficTarget .Spec .Rules , trafficTarget .Namespace )
142
158
if err != nil {
@@ -155,7 +171,7 @@ func (mc *MeshCatalog) getRoutingRulesFromTrafficTarget(trafficTarget access.Tra
155
171
var routingRules []* trafficpolicy.Rule
156
172
for _ , httpRouteMatch := range httpRouteMatches {
157
173
rule := & trafficpolicy.Rule {
158
- Route : * trafficpolicy .NewRouteWeightedCluster (httpRouteMatch , []service.WeightedCluster {routingCluster }),
174
+ Route : * trafficpolicy .NewRouteWeightedCluster (httpRouteMatch , []service.WeightedCluster {routingCluster }, upstreamTrafficSetting ),
159
175
AllowedServiceIdentities : allowedDownstreamIdentities ,
160
176
}
161
177
routingRules = append (routingRules , rule )
0 commit comments