diff --git a/pkg/envoy/eds/cluster_load_assignment.go b/pkg/envoy/eds/cluster_load_assignment.go index 2ecaef48f1..7172112c7b 100644 --- a/pkg/envoy/eds/cluster_load_assignment.go +++ b/pkg/envoy/eds/cluster_load_assignment.go @@ -29,26 +29,32 @@ func newClusterLoadAssignment(svc service.MeshService, serviceEndpoints []endpoi }, } - lenIPs := len(serviceEndpoints) - if lenIPs == 0 { - lenIPs = 1 + // If there are no service endpoints corresponding to this service, we + // return a ClusterLoadAssignment without any endpoints. + // Envoy will correctly handle this response. + // This can happen if we create a cluster via CDS corresponding to a traffic split + // apex service that has no endpoints. + if len(serviceEndpoints) == 0 { + return cla } - weight := uint32(100 / lenIPs) + + // Equal weight is assigned to a cluster with multiple endpoints in the same locality + lbWeightPerEndpoint := 100 / len(serviceEndpoints) for _, meshEndpoint := range serviceEndpoints { - log.Trace().Msgf("Adding Endpoint: Cluster=%s, Services=%s, Endpoint=%s, Weight=%d", svc, svc, meshEndpoint, weight) - lbEpt := xds_endpoint.LbEndpoint{ + log.Trace().Msgf("Adding Endpoint: cluster=%s, endpoint=%s, weight=%d", svc, meshEndpoint, lbWeightPerEndpoint) + lbEpt := &xds_endpoint.LbEndpoint{ HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{ Endpoint: &xds_endpoint.Endpoint{ Address: envoy.GetAddress(meshEndpoint.IP.String(), uint32(meshEndpoint.Port)), }, }, LoadBalancingWeight: &wrappers.UInt32Value{ - Value: weight, + Value: uint32(lbWeightPerEndpoint), }, } - cla.Endpoints[0].LbEndpoints = append(cla.Endpoints[0].LbEndpoints, &lbEpt) + cla.Endpoints[0].LbEndpoints = append(cla.Endpoints[0].LbEndpoints, lbEpt) } - log.Trace().Msgf("Constructed ClusterLoadAssignment: %v", cla) + return cla } diff --git a/pkg/envoy/eds/cluster_load_assignment_test.go b/pkg/envoy/eds/cluster_load_assignment_test.go index cbf9ce7183..a1dedb5128 100644 --- a/pkg/envoy/eds/cluster_load_assignment_test.go +++ b/pkg/envoy/eds/cluster_load_assignment_test.go @@ -4,48 +4,89 @@ import ( "net" "testing" + xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + "github.com/golang/protobuf/ptypes/wrappers" + "github.com/google/go-cmp/cmp" tassert "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/testing/protocmp" "github.com/openservicemesh/osm/pkg/endpoint" + "github.com/openservicemesh/osm/pkg/envoy" "github.com/openservicemesh/osm/pkg/service" ) func TestNewClusterLoadAssignment(t *testing.T) { - assert := tassert.New(t) - - namespacedServices := []service.MeshService{ - {Namespace: "ns1", Name: "bookstore-1", TargetPort: 80}, - {Namespace: "ns2", Name: "bookstore-2", TargetPort: 90}, - } - - allServiceEndpoints := map[service.MeshService][]endpoint.Endpoint{ - namespacedServices[0]: { - {IP: net.IP("0.0.0.0")}, + testCases := []struct { + name string + svc service.MeshService + endpoints []endpoint.Endpoint + expected *xds_endpoint.ClusterLoadAssignment + }{ + { + name: "multiple endpoints per cluster within the same locality", + svc: service.MeshService{Namespace: "ns1", Name: "bookstore-1", TargetPort: 80}, + endpoints: []endpoint.Endpoint{ + {IP: net.ParseIP("1.1.1.1"), Port: 80}, + {IP: net.ParseIP("2.2.2.2"), Port: 80}, + }, + expected: &xds_endpoint.ClusterLoadAssignment{ + ClusterName: "ns1/bookstore-1|80", + Endpoints: []*xds_endpoint.LocalityLbEndpoints{ + { + Locality: &xds_core.Locality{ + Zone: zone, + }, + LbEndpoints: []*xds_endpoint.LbEndpoint{ + { + HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{ + Endpoint: &xds_endpoint.Endpoint{ + Address: envoy.GetAddress("1.1.1.1", 80), + }, + }, + LoadBalancingWeight: &wrappers.UInt32Value{ + Value: 50, + }, + }, + { + HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{ + Endpoint: &xds_endpoint.Endpoint{ + Address: envoy.GetAddress("2.2.2.2", 80), + }, + }, + LoadBalancingWeight: &wrappers.UInt32Value{ + Value: 50, + }, + }, + }, + }, + }, + }, }, - namespacedServices[1]: { - {IP: net.IP("0.0.0.1")}, - {IP: net.IP("0.0.0.2")}, + { + name: "no endpoints for cluster", + svc: service.MeshService{Namespace: "ns1", Name: "bookstore-1", TargetPort: 80}, + endpoints: nil, + expected: &xds_endpoint.ClusterLoadAssignment{ + ClusterName: "ns1/bookstore-1|80", + Endpoints: []*xds_endpoint.LocalityLbEndpoints{ + { + Locality: &xds_core.Locality{ + Zone: zone, + }, + LbEndpoints: []*xds_endpoint.LbEndpoint{}, + }, + }, + }, }, } - cla := newClusterLoadAssignment(namespacedServices[0], allServiceEndpoints[namespacedServices[0]]) - assert.NotNil(cla) - assert.Equal(cla.ClusterName, "ns1/bookstore-1|80") - assert.Len(cla.Endpoints, 1) - assert.Len(cla.Endpoints[0].LbEndpoints, 1) - assert.Equal(cla.Endpoints[0].LbEndpoints[0].GetLoadBalancingWeight().Value, uint32(100)) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert := tassert.New(t) - cla2 := newClusterLoadAssignment(namespacedServices[1], allServiceEndpoints[namespacedServices[1]]) - assert.NotNil(cla2) - assert.Equal(cla2.ClusterName, "ns2/bookstore-2|90") - assert.Len(cla2.Endpoints, 1) - assert.Len(cla2.Endpoints[0].LbEndpoints, 2) - assert.Equal(cla2.Endpoints[0].LbEndpoints[0].GetLoadBalancingWeight().Value, uint32(50)) - assert.Equal(cla2.Endpoints[0].LbEndpoints[1].GetLoadBalancingWeight().Value, uint32(50)) - - cla3 := newClusterLoadAssignment(namespacedServices[0], []endpoint.Endpoint{}) - assert.NotNil(cla3) - assert.Equal(cla3.ClusterName, "ns1/bookstore-1|80") - assert.Len(cla3.Endpoints, 1) - assert.Len(cla3.Endpoints[0].LbEndpoints, 0) + actual := newClusterLoadAssignment(tc.svc, tc.endpoints) + assert.True(cmp.Equal(tc.expected, actual, protocmp.Transform()), cmp.Diff(tc.expected, actual, protocmp.Transform())) + }) + } } diff --git a/pkg/envoy/eds/response.go b/pkg/envoy/eds/response.go index f6c7d61fc3..cebf27afda 100644 --- a/pkg/envoy/eds/response.go +++ b/pkg/envoy/eds/response.go @@ -50,11 +50,6 @@ func fulfillEDSRequest(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, re continue } endpoints := meshCatalog.ListAllowedUpstreamEndpointsForService(proxyIdentity, meshSvc) - if len(endpoints) == 0 { - log.Error().Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrEndpointsNotFound)). - Msgf("Endpoints not found for upstream cluster %s for proxy identity %s, skipping cluster in EDS response", cluster, proxyIdentity) - continue - } log.Trace().Msgf("Endpoints for upstream cluster %s for downstream proxy identity %s: %v", cluster, proxyIdentity, endpoints) loadAssignment := newClusterLoadAssignment(meshSvc, endpoints) rdsResources = append(rdsResources, loadAssignment)