Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

Allow all headless services, not just those backed by Statefulsets with subdomains #5250

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/cli/verifier/envoy_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/compute/kube"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/trafficpolicy"

"github.com/openservicemesh/osm/pkg/constants"
Expand Down Expand Up @@ -479,11 +480,15 @@ func (v *EnvoyConfigVerifier) getDstMeshServicesForK8sSvc(svc corev1.Service) ([
// us to retrieve the TargetPort for the MeshService.
meshSvc.TargetPort = kube.GetTargetPortFromEndpoints(portSpec.Name, *endpoints)

if !kube.IsHeadlessService(svc) {
meshServices = append(meshServices, meshSvc)
// Even if the service is headless, add it so it can be targeted
meshServices = append(meshServices, meshSvc)

if !k8s.IsHeadlessService(svc) {
continue
}

// Add services corresponding to endpoint hostnames to handle the
// statefulset use-case
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
if address.Hostname == "" {
Expand Down
9 changes: 7 additions & 2 deletions pkg/compute/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,16 @@ func (c *client) serviceToMeshServices(svc corev1.Service) []service.MeshService
} else {
log.Warn().Msgf("k8s service %s/%s does not have endpoints but is being represented as a MeshService", svc.Namespace, svc.Name)
}
if !IsHeadlessService(svc) || endpoints == nil {
meshServices = append(meshServices, meshSvc)

// Even if the service is headless, add it so it can be targeted
meshServices = append(meshServices, meshSvc)

if !k8s.IsHeadlessService(svc) || endpoints == nil {
continue
}

// Add services corresponding to endpoint hostnames to handle the
// statefulset use-case
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
if address.Hostname == "" {
Expand Down
159 changes: 156 additions & 3 deletions pkg/compute/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,68 @@ func TestServiceToMeshServices(t *testing.T) {
TargetPort: 8080,
Protocol: "http",
},
{
Namespace: "ns1",
Name: "s1",
Port: 80,
TargetPort: 8080,
Protocol: "http",
},
},
},
{
name: "k8s headless service with single port and endpoint (no hostname), no appProtocol set",
// Single port on the service maps to a single MeshService.
// Since no appProtocol is specified, MeshService.Protocol should default
// to http because Port.Protocol=TCP
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "p1",
Port: 80,
Protocol: corev1.ProtocolTCP,
},
},
ClusterIP: corev1.ClusterIPNone,
},
},
svcEndpoints: []runtime.Object{
&corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
// Should match svc.Name and svc.Namespace
Namespace: "ns1",
Name: "s1",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "10.1.0.1",
},
},
Ports: []corev1.EndpointPort{
{
// Must match the port of 'svc.Spec.Ports[0]'
Port: 8080, // TargetPort
},
},
},
},
},
},
expected: []service.MeshService{
{
Namespace: "ns1",
Name: "s1",
Port: 80,
TargetPort: 8080,
Protocol: "http",
},
},
},
{
Expand Down Expand Up @@ -2512,6 +2574,92 @@ func TestServiceToMeshServices(t *testing.T) {
TargetPort: 9090,
Protocol: "tcp",
},
{
Namespace: "ns1",
Name: "s1",
Port: 80,
TargetPort: 8080,
Protocol: "http",
},
{
Namespace: "ns1",
Name: "s1",
Port: 90,
TargetPort: 9090,
Protocol: "tcp",
},
},
},
{
name: "multiple ports on k8s headless service with appProtocol specified and no hostname in the endpoints",
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Ports: []corev1.ServicePort{
{
Name: "p1",
Port: 80,
AppProtocol: pointer.StringPtr("http"),
},
{
Name: "p2",
Port: 90,
AppProtocol: pointer.StringPtr("tcp"),
},
},
},
},
svcEndpoints: []runtime.Object{
&corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
// Should match svc.Name and svc.Namespace
Namespace: "ns1",
Name: "s1",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "10.1.0.1",
},
},
Ports: []corev1.EndpointPort{
{
// Must match the port of 'svc.Spec.Ports[0]'
Name: "p1",
Port: 8080, // TargetPort
AppProtocol: pointer.StringPtr("http"),
},
{
// Must match the port of 'svc.Spec.Ports[1]'
Name: "p2",
Port: 9090, // TargetPort
AppProtocol: pointer.StringPtr("tcp"),
},
},
},
},
},
},
expected: []service.MeshService{
{
Namespace: "ns1",
Name: "s1",
Port: 80,
TargetPort: 8080,
Protocol: "http",
},
{
Namespace: "ns1",
Name: "s1",
Port: 90,
TargetPort: 9090,
Protocol: "tcp",
},
},
},
{
Expand Down Expand Up @@ -2682,7 +2830,7 @@ func TestGetMeshService(t *testing.T) {
expectErr: true,
},
{
name: "TargetPort not found as Endpoint does not exist",
name: "TargetPort not found as matching Endpoint does not exist",
svc: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "s1",
Expand All @@ -2693,6 +2841,7 @@ func TestGetMeshService(t *testing.T) {
Name: "p1",
Port: 80,
}},
ClusterIP: corev1.ClusterIPNone,
},
},
endpoints: &corev1.Endpoints{
Expand All @@ -2707,14 +2856,18 @@ func TestGetMeshService(t *testing.T) {
Name: "invalid", // does not match svc port
Port: 8080,
},
{
Name: "invalid2",
Port: 8081, // also does not match svc port; having 2 ports triggers the name filter logic
},
},
},
},
},
namespacedSvc: types.NamespacedName{Namespace: "ns1", Name: "s1"}, // matches svc
port: 80, // matches svc
expectedTargetPort: 0, // matches endpoint's 'p1' port
expectErr: true,
expectedTargetPort: 0, // does not match either of the endpoint's ports
expectErr: false, // port matches, so no error should be expected
},
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/compute/kube/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,3 @@ func GetTargetPortFromEndpoints(endpointName string, endpoints corev1.Endpoints)
}
return
}

// IsHeadlessService determines whether or not a corev1.Service is a headless service
// TODO(4863): unexport this method, it should not be used outside of this package.
func IsHeadlessService(svc corev1.Service) bool {
return len(svc.Spec.ClusterIP) == 0 || svc.Spec.ClusterIP == corev1.ClusterIPNone
}
19 changes: 14 additions & 5 deletions tests/e2e/e2e_client_server_connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/tests"

"github.com/openservicemesh/osm/pkg/constants"
. "github.com/openservicemesh/osm/tests/framework"
Expand All @@ -26,26 +27,34 @@ var _ = OSMDescribe("Test HTTP traffic from 1 pod client -> 1 pod server",
func() {
Context("Test traffic flowing from client to server with a Kubernetes Service for the Source: HTTP", func() {
withSourceKubernetesService := true
testTraffic(withSourceKubernetesService, PodCommandDefault)
testTraffic(withSourceKubernetesService, PodCommandDefault, false)
})

Context("Test traffic flowing from client to server without a Kubernetes Service for the Source: HTTP", func() {
// Prior iterations of OSM required that a source pod belong to a Kubernetes service
// for the Envoy proxy to be configured for outbound traffic to some remote server.
// This test ensures we test this scenario: client Pod is not associated w/ a service.
withSourceKubernetesService := false
testTraffic(withSourceKubernetesService, PodCommandDefault)
testTraffic(withSourceKubernetesService, PodCommandDefault, false)
})

Context("Test traffic flowing from client to a server with a podIP bind", func() {
// Prior iterations of OSM didn't allow mesh services to bind to the podIP
// This test ensures that that behavior is configurable via MeshConfig
withSourceKubernetesService := true
testTraffic(withSourceKubernetesService, []string{"gunicorn", "-b", "$(POD_IP):80", "httpbin:app", "-k", "gevent"}, WithLocalProxyMode(v1alpha2.LocalProxyModePodIP))
testTraffic(withSourceKubernetesService, []string{"gunicorn", "-b", "$(POD_IP):80", "httpbin:app", "-k", "gevent"}, false, WithLocalProxyMode(v1alpha2.LocalProxyModePodIP))
})

Context("Test traffic flowing from client to a headless service without a Kubernetes Service for the Source: HTTP", func() {
// Prior iterations of OSM required that a source pod belong to a Kubernetes service
// for the Envoy proxy to be configured for outbound traffic to some remote server.
// This test ensures we test this scenario: client Pod is not associated w/ a service.
withSourceKubernetesService := true
testTraffic(withSourceKubernetesService, PodCommandDefault, true)
})
})

func testTraffic(withSourceKubernetesService bool, destPodCommand []string, installOpts ...InstallOsmOpt) {
func testTraffic(withSourceKubernetesService bool, destPodCommand []string, destServiceHeadless bool, installOpts ...InstallOsmOpt) {
const sourceName = "client"
const destName = "server"
var ns = []string{sourceName, destName}
Expand All @@ -68,7 +77,7 @@ func testTraffic(withSourceKubernetesService bool, destPodCommand []string, inst
Expect(err).NotTo(HaveOccurred())
_, err = Td.CreatePod(destName, podDef)
Expect(err).NotTo(HaveOccurred())
dstSvc, err := Td.CreateService(destName, svcDef)
dstSvc, err := Td.CreateService(destName, *tests.HeadlessSvc(&svcDef))
Expect(err).NotTo(HaveOccurred())

// Expect it to be up and running in it's receiver namespace
Expand Down
Loading