Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

envoy|catalog: use TrafficMatch to build inbound filter config #4814

Merged
merged 1 commit into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions pkg/catalog/inbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
trafficTargets = mc.meshSpec.ListTrafficTargets(destinationFilter)
}

upstreamSvcSet := mapset.NewSet()
for _, svc := range upstreamServices {
upstreamSvcSet.Add(svc)
}

// A policy (traffic match, route, cluster) must be built for each upstream service. This
// includes apex/root services associated with the given upstream service.
allUpstreamServices := mc.getUpstreamServicesIncludeApex(upstreamServices)
Expand All @@ -59,25 +64,36 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
}
clusterConfigs = append(clusterConfigs, clusterConfigForSvc)

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc})

// ---
// Create a TrafficMatch for this upstream servic.
// The TrafficMatch will be used by LDS to program a filter chain match
// for this upstream service on the upstream server to accept inbound
// traffic.
trafficMatchForUpstreamSvc := &trafficpolicy.TrafficMatch{
Name: upstreamSvc.InboundTrafficMatchName(),
DestinationPort: int(upstreamSvc.TargetPort),
DestinationProtocol: upstreamSvc.Protocol,
}

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc})
if upstreamTrafficSetting != nil {
trafficMatchForUpstreamSvc.RateLimit = upstreamTrafficSetting.Spec.RateLimit
//
// Note: a TrafficMatch must exist only for a service part of the given
// 'upstreamServices' list, and not a virtual (apex) service that
// may be returned as a part of the 'allUpstreamServices' list.
// A virtual (apex) service is required for the purpose of building
// HTTP routing rules, but should not result in a TrafficMatch rule
// as TrafficMatch rules are meant to map to actual services backed
// by a proxy, defined by the 'upstreamServices' list.
if upstreamSvcSet.Contains(upstreamSvc) {
trafficMatchForUpstreamSvc := &trafficpolicy.TrafficMatch{
Name: upstreamSvc.InboundTrafficMatchName(),
DestinationPort: int(upstreamSvc.TargetPort),
DestinationProtocol: upstreamSvc.Protocol,
ServerNames: []string{upstreamSvc.ServerName()},
Cluster: upstreamSvc.EnvoyLocalClusterName(),
}
if upstreamTrafficSetting != nil {
trafficMatchForUpstreamSvc.RateLimit = upstreamTrafficSetting.Spec.RateLimit
}
trafficMatches = append(trafficMatches, trafficMatchForUpstreamSvc)
}

trafficMatches = append(trafficMatches, trafficMatchForUpstreamSvc)

// Build the HTTP route configs for this service and port combination.
// If the port's protocol corresponds to TCP, we can skip this step
if upstreamSvc.Protocol == constants.ProtocolTCP || upstreamSvc.Protocol == constants.ProtocolTCPServerFirst {
Expand Down
4 changes: 4 additions & 0 deletions pkg/catalog/inbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,15 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
Name: "inbound_ns1/mysql-0.mysql_3306_tcp",
DestinationPort: 3306,
DestinationProtocol: "tcp",
ServerNames: []string{"mysql-0.mysql.ns1.svc.cluster.local"},
Cluster: "ns1/mysql-0.mysql|3306|local",
},
{
Name: "inbound_ns1/s2_9090_http",
DestinationPort: 9090,
DestinationProtocol: "http",
ServerNames: []string{"s2.ns1.svc.cluster.local"},
Cluster: "ns1/s2|9090|local",
},
},
ClustersConfigs: []*trafficpolicy.MeshClusterConfig{
Expand Down
107 changes: 61 additions & 46 deletions pkg/envoy/lds/inmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/rds/route"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)

Expand All @@ -26,42 +25,50 @@ const (
outboundMeshTCPProxyStatPrefix = "outbound-mesh-tcp-proxy"
)

func (lb *listenerBuilder) getInboundMeshFilterChains(proxyService service.MeshService) []*xds_listener.FilterChain {
func (lb *listenerBuilder) getInboundMeshFilterChains(trafficMatches []*trafficpolicy.TrafficMatch) []*xds_listener.FilterChain {
var filterChains []*xds_listener.FilterChain

// Create protocol specific inbound filter chains for MeshService's TargetPort
switch strings.ToLower(proxyService.Protocol) {
case constants.ProtocolHTTP, constants.ProtocolGRPC:
// Filter chain for HTTP port
filterChainForPort, err := lb.getInboundMeshHTTPFilterChain(proxyService)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound HTTP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort)
}
filterChains = append(filterChains, filterChainForPort)
for _, match := range trafficMatches {
// Create protocol specific inbound filter chains for MeshService's TargetPort
switch strings.ToLower(match.DestinationProtocol) {
case constants.ProtocolHTTP, constants.ProtocolGRPC:
// Filter chain for HTTP port
filterChainForPort, err := lb.getInboundMeshHTTPFilterChain(match)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound HTTP filter chain for traffic match %s", match.Name)
} else {
filterChains = append(filterChains, filterChainForPort)
}

case constants.ProtocolTCP, constants.ProtocolTCPServerFirst:
filterChainForPort, err := lb.getInboundMeshTCPFilterChain(proxyService)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound TCP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort)
}
filterChains = append(filterChains, filterChainForPort)
case constants.ProtocolTCP, constants.ProtocolTCPServerFirst:
filterChainForPort, err := lb.getInboundMeshTCPFilterChain(match)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound TCP filter chain for traffic match %s", match.Name)
} else {
filterChains = append(filterChains, filterChainForPort)
}

default:
log.Error().Msgf("Cannot build inbound filter chain, unsupported protocol %s for proxy-service:port %s:%d", proxyService.Protocol, proxyService, proxyService.TargetPort)
default:
log.Error().Msgf("Cannot build inbound filter chain, unsupported protocol %s for traffic match %s", match.DestinationProtocol, match.Name)
}
}

return filterChains
}

func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshService) ([]*xds_listener.Filter, error) {
func (lb *listenerBuilder) getInboundHTTPFilters(trafficMatch *trafficpolicy.TrafficMatch) ([]*xds_listener.Filter, error) {
if trafficMatch == nil {
return nil, nil
}

var filters []*xds_listener.Filter

// Apply an RBAC filter when permissive mode is disabled. The RBAC filter must be the first filter in the list of filters.
if !lb.cfg.IsPermissiveTrafficPolicyMode() {
// Apply RBAC policies on the inbound filters based on configured policies
rbacFilter, err := lb.buildRBACFilter()
if err != nil {
log.Error().Err(err).Msgf("Error applying RBAC filter for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error applying RBAC filter for traffic match %s", trafficMatch.Name)
return nil, err
}
// RBAC filter should be the very first filter in the filter chain
Expand All @@ -71,7 +78,7 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic
// Build the HTTP Connection Manager filter from its options
inboundConnManager, err := httpConnManagerOptions{
direction: inbound,
rdsRoutConfigName: route.GetInboundMeshRouteConfigNameForPort(int(proxyService.TargetPort)),
rdsRoutConfigName: route.GetInboundMeshRouteConfigNameForPort(trafficMatch.DestinationPort),

// Additional filters
wasmStatsHeaders: lb.getWASMStatsHeaders(),
Expand All @@ -83,12 +90,12 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic
tracingAPIEndpoint: lb.cfg.GetTracingEndpoint(),
}.build()
if err != nil {
return nil, errors.Wrapf(err, "Error building inbound HTTP connection manager for proxy with identity %s and service %s", lb.serviceIdentity, proxyService)
return nil, errors.Wrapf(err, "Error building inbound HTTP connection manager for proxy with identity %s and traffic match %s", lb.serviceIdentity, trafficMatch.Name)
}

marshalledInboundConnManager, err := anypb.New(inboundConnManager)
if err != nil {
return nil, errors.Wrapf(err, "Error marshalling inbound HTTP connection manager for proxy with identity %s and service %s", lb.serviceIdentity, proxyService)
return nil, errors.Wrapf(err, "Error marshalling inbound HTTP connection manager for proxy with identity %s and traffic match %s", lb.serviceIdentity, trafficMatch.Name)
}
httpConnectionManagerFilter := &xds_listener.Filter{
Name: wellknown.HTTPConnectionManager,
Expand All @@ -101,38 +108,40 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic
return filters, nil
}

func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.MeshService) (*xds_listener.FilterChain, error) {
func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(trafficMatch *trafficpolicy.TrafficMatch) (*xds_listener.FilterChain, error) {
if trafficMatch == nil {
return nil, nil
}

// Construct HTTP filters
filters, err := lb.getInboundHTTPFilters(proxyService)
filters, err := lb.getInboundHTTPFilters(trafficMatch)
if err != nil {
log.Error().Err(err).Msgf("Error constructing inbound HTTP filters for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error constructing inbound HTTP filters for traffic match %s", trafficMatch.Name)
return nil, err
}

// Construct downstream TLS context
marshalledDownstreamTLSContext, err := anypb.New(envoy.GetDownstreamTLSContext(lb.serviceIdentity, true /* mTLS */, lb.cfg.GetMeshConfig().Spec.Sidecar))
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMarshallingXDSResource)).
Msgf("Error marshalling DownstreamTLSContext for proxy service %s", proxyService)
Msgf("Error marshalling DownstreamTLSContext for traffic match %s", trafficMatch.Name)
return nil, err
}

serverNames := []string{proxyService.ServerName()}

filterChain := &xds_listener.FilterChain{
Name: proxyService.InboundTrafficMatchName(),
Name: trafficMatch.Name,
Filters: filters,

// The 'FilterChainMatch' field defines the criteria for matching traffic against filters in this filter chain
FilterChainMatch: &xds_listener.FilterChainMatch{
// The DestinationPort is the service port the downstream directs traffic to
DestinationPort: &wrapperspb.UInt32Value{
Value: uint32(proxyService.TargetPort),
Value: uint32(trafficMatch.DestinationPort),
},

// The ServerName is the SNI set by the downstream in the UptreamTlsContext by GetUpstreamTLSContext()
// This is not a field obtained from the mTLS Certificate.
ServerNames: serverNames,
ServerNames: trafficMatch.ServerNames,

// Only match when transport protocol is TLS
TransportProtocol: envoy.TransportProtocolTLS,
Expand All @@ -152,35 +161,37 @@ func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.Me
return filterChain, nil
}

func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.MeshService) (*xds_listener.FilterChain, error) {
func (lb *listenerBuilder) getInboundMeshTCPFilterChain(trafficMatch *trafficpolicy.TrafficMatch) (*xds_listener.FilterChain, error) {
if trafficMatch == nil {
return nil, nil
}

// Construct TCP filters
filters, err := lb.getInboundTCPFilters(proxyService)
filters, err := lb.getInboundTCPFilters(trafficMatch)
if err != nil {
log.Error().Err(err).Msgf("Error constructing inbound TCP filters for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error constructing inbound TCP filters for traffic match %s", trafficMatch.Name)
return nil, err
}

// Construct downstream TLS context
marshalledDownstreamTLSContext, err := anypb.New(envoy.GetDownstreamTLSContext(lb.serviceIdentity, true /* mTLS */, lb.cfg.GetMeshConfig().Spec.Sidecar))
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMarshallingXDSResource)).
Msgf("Error marshalling DownstreamTLSContext for proxy service %s", proxyService)
Msgf("Error marshalling DownstreamTLSContext for traffic match %s", trafficMatch.Name)
return nil, err
}

serverNames := []string{proxyService.ServerName()}

return &xds_listener.FilterChain{
Name: proxyService.InboundTrafficMatchName(),
Name: trafficMatch.Name,
FilterChainMatch: &xds_listener.FilterChainMatch{
// The DestinationPort is the service port the downstream directs traffic to
DestinationPort: &wrapperspb.UInt32Value{
Value: uint32(proxyService.TargetPort),
Value: uint32(trafficMatch.DestinationPort),
},

// The ServerName is the SNI set by the downstream in the UptreamTlsContext by GetUpstreamTLSContext()
// This is not a field obtained from the mTLS Certificate.
ServerNames: serverNames,
ServerNames: trafficMatch.ServerNames,

// Only match when transport protocol is TLS
TransportProtocol: envoy.TransportProtocolTLS,
Expand All @@ -198,15 +209,19 @@ func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.Mes
}, nil
}

func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService) ([]*xds_listener.Filter, error) {
func (lb *listenerBuilder) getInboundTCPFilters(trafficMatch *trafficpolicy.TrafficMatch) ([]*xds_listener.Filter, error) {
if trafficMatch == nil {
return nil, nil
}

var filters []*xds_listener.Filter

// Apply an RBAC filter when permissive mode is disabled. The RBAC filter must be the first filter in the list of filters.
if !lb.cfg.IsPermissiveTrafficPolicyMode() {
// Apply RBAC policies on the inbound filters based on configured policies
rbacFilter, err := lb.buildRBACFilter()
if err != nil {
log.Error().Err(err).Msgf("Error applying RBAC filter for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error applying RBAC filter for traffic match %s", trafficMatch.Name)
return nil, err
}
// RBAC filter should be the very first filter in the filter chain
Expand All @@ -215,8 +230,8 @@ func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService

// Apply the TCP Proxy Filter
tcpProxy := &xds_tcp_proxy.TcpProxy{
StatPrefix: fmt.Sprintf("%s.%s", inboundMeshTCPProxyStatPrefix, proxyService.EnvoyLocalClusterName()),
ClusterSpecifier: &xds_tcp_proxy.TcpProxy_Cluster{Cluster: proxyService.EnvoyLocalClusterName()},
StatPrefix: fmt.Sprintf("%s.%s", inboundMeshTCPProxyStatPrefix, trafficMatch.Cluster),
ClusterSpecifier: &xds_tcp_proxy.TcpProxy_Cluster{Cluster: trafficMatch.Cluster},
}
marshalledTCPProxy, err := anypb.New(tcpProxy)
if err != nil {
Expand Down
Loading