Skip to content

Commit b418716

Browse files
relistanpatoms
andauthored
Suport websockets as a ProxyMode for Envoy (#62)
Co-authored-by: Tom Patterer <[email protected]>
1 parent ff3e175 commit b418716

File tree

11 files changed

+178
-44
lines changed

11 files changed

+178
-44
lines changed

.ignore

-1
This file was deleted.

README.md

+9-2
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,8 @@ how to handle a service it has discovered. It uses these to:
283283
2. How to name the service. `ServiceName=`
284284
3. How to health check the service. `HealthCheck` and `HealthCheckArgs`
285285
4. Whether or not the service is a receiver of Sidecar change events. `SidecarListener`
286-
5. Wether or not Sidecar should entirely ignore this service. `SidecarDiscovery`
287-
6. HAproxy proxy behavior. `ProxyMode`
286+
5. Whether or not Sidecar should entirely ignore this service. `SidecarDiscovery`
287+
6. Envoy or HAproxy proxy behavior. `ProxyMode`
288288

289289
**Service Ports**
290290
Services may be started with one or more `ServicePort_xxx` labels that help
@@ -334,6 +334,13 @@ setting the following Docker label:
334334
ProxyMode=tcp
335335
```
336336

337+
You may also enable Websocket support where it's available (e.g. in Envoy) by
338+
setting:
339+
340+
```
341+
ProxyMode=ws
342+
```
343+
337344
**Templating In Labels**
338345
You sometimes need to pass information in the Docker labels which
339346
is not available to you at the time of container creation. One example of this

config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type HAproxyConfig struct {
2121
PidFile string `envconfig:"PID_FILE" default:"/var/run/haproxy.pid"`
2222
Disable bool `envconfig:"DISABLE"`
2323
User string `envconfig:"USER" default:"haproxy"`
24-
Group string `envconfig:"GROUP" default:"haproxy"`
24+
Group string `envconfig:"GROUP" default:""`
2525
UseHostnames bool `envconfig:"USE_HOSTNAMES"`
2626
}
2727

envoy/adapter/adapter.go

+78-21
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/golang/protobuf/ptypes/duration"
2323
"github.com/golang/protobuf/ptypes/wrappers"
2424
log "github.com/sirupsen/logrus"
25+
anypb "google.golang.org/protobuf/types/known/anypb"
2526
)
2627

2728
const (
@@ -160,17 +161,14 @@ func EnvoyResourcesFromState(state *catalog.ServicesState, bindIP string,
160161
}
161162
}
162163

163-
// envoyListenerFromService creates an Envoy listener from a service instance
164-
func envoyListenerFromService(svc *service.Service, envoyServiceName string,
165-
servicePort int64, bindIP string) (cache_types.Resource, error) {
166-
167-
var connectionManagerName string
168-
var connectionManager proto.Message
164+
// connectionManagerForService returns a ConnectionManager configured
165+
// appropriately for the Sidecar service
166+
func connectionManagerForService(svc *service.Service, envoyServiceName string) (managerName string, manager proto.Message, err error) {
169167
switch svc.ProxyMode {
170168
case "http":
171-
connectionManagerName = wellknown.HTTPConnectionManager
169+
managerName = wellknown.HTTPConnectionManager
172170

173-
connectionManager = &hcm.HttpConnectionManager{
171+
manager = &hcm.HttpConnectionManager{
174172
StatPrefix: "ingress_http",
175173
HttpFilters: []*hcm.HttpFilter{{
176174
Name: wellknown.Router,
@@ -201,23 +199,89 @@ func envoyListenerFromService(svc *service.Service, envoyServiceName string,
201199
},
202200
}
203201
case "tcp":
204-
connectionManagerName = wellknown.TCPProxy
202+
managerName = wellknown.TCPProxy
205203

206-
connectionManager = &tcpp.TcpProxy{
204+
manager = &tcpp.TcpProxy{
207205
StatPrefix: "ingress_tcp",
208206
ClusterSpecifier: &tcpp.TcpProxy_Cluster{
209207
Cluster: envoyServiceName,
210208
},
211209
}
210+
case "ws":
211+
managerName = wellknown.HTTPConnectionManager
212+
213+
manager = &hcm.HttpConnectionManager{
214+
StatPrefix: "ingress_http",
215+
HttpFilters: []*hcm.HttpFilter{{
216+
Name: wellknown.Router,
217+
}},
218+
RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{
219+
RouteConfig: &api.RouteConfiguration{
220+
ValidateClusters: &wrappers.BoolValue{Value: false},
221+
VirtualHosts: []*route.VirtualHost{{
222+
Name: svc.Name,
223+
Domains: []string{"*"},
224+
Routes: []*route.Route{{
225+
Match: &route.RouteMatch{
226+
PathSpecifier: &route.RouteMatch_Prefix{
227+
Prefix: "/",
228+
},
229+
},
230+
Action: &route.Route_Route{
231+
Route: &route.RouteAction{
232+
ClusterSpecifier: &route.RouteAction_Cluster{
233+
Cluster: envoyServiceName,
234+
},
235+
Timeout: &duration.Duration{},
236+
},
237+
},
238+
}},
239+
}},
240+
},
241+
},
242+
UpgradeConfigs: []*hcm.HttpConnectionManager_UpgradeConfig{
243+
{
244+
UpgradeType: "websocket",
245+
},
246+
},
247+
}
212248
default:
213-
return nil, fmt.Errorf("unrecognised proxy mode: %s", svc.ProxyMode)
249+
return "", nil, fmt.Errorf("unrecognised proxy mode: %s", svc.ProxyMode)
250+
}
251+
252+
// If it was a supported type, return the result
253+
return managerName, manager, nil
254+
}
255+
256+
// filterChainsForService returns a filter chain configured appropriately for
257+
// the Sidecar service
258+
func filterChainsForService(svc *service.Service, managerName string, serializedManager *anypb.Any) []*listener.FilterChain {
259+
return []*listener.FilterChain{{
260+
Filters: []*listener.Filter{{
261+
Name: managerName,
262+
ConfigType: &listener.Filter_TypedConfig{
263+
TypedConfig: serializedManager,
264+
},
265+
}},
266+
}}
267+
}
268+
269+
// envoyListenerFromService creates an Envoy listener from a service instance
270+
func envoyListenerFromService(svc *service.Service, envoyServiceName string,
271+
servicePort int64, bindIP string) (cache_types.Resource, error) {
272+
273+
managerName, manager, err := connectionManagerForService(svc, envoyServiceName)
274+
if err != nil {
275+
return nil, fmt.Errorf("failed to create the connection manager: %w", err)
214276
}
215277

216-
serialisedConnectionManager, err := ptypes.MarshalAny(connectionManager)
278+
serializedManager, err := ptypes.MarshalAny(manager)
217279
if err != nil {
218-
return nil, fmt.Errorf("failed to create the connection manager: %s", err)
280+
return nil, fmt.Errorf("failed to create the connection manager: %w", err)
219281
}
220282

283+
filterChains := filterChainsForService(svc, managerName, serializedManager)
284+
221285
return &api.Listener{
222286
Name: envoyServiceName,
223287
Address: &core.Address{
@@ -230,14 +294,7 @@ func envoyListenerFromService(svc *service.Service, envoyServiceName string,
230294
},
231295
},
232296
},
233-
FilterChains: []*listener.FilterChain{{
234-
Filters: []*listener.Filter{{
235-
Name: connectionManagerName,
236-
ConfigType: &listener.Filter_TypedConfig{
237-
TypedConfig: serialisedConnectionManager,
238-
},
239-
}},
240-
}},
297+
FilterChains: filterChains,
241298
}, nil
242299
}
243300

envoy/server_test.go

+60-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"io/ioutil"
78
"net"
89
"sort"
910
"testing"
@@ -13,6 +14,7 @@ import (
1314
"github.com/Nitro/sidecar/config"
1415
"github.com/Nitro/sidecar/envoy/adapter"
1516
"github.com/Nitro/sidecar/service"
17+
1618
api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
1719
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
1820
hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
@@ -22,11 +24,15 @@ import (
2224
"github.com/envoyproxy/go-control-plane/pkg/resource/v2"
2325
xds "github.com/envoyproxy/go-control-plane/pkg/server/v2"
2426
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
27+
2528
"github.com/golang/protobuf/ptypes"
2629
"github.com/golang/protobuf/ptypes/any"
30+
"google.golang.org/grpc"
31+
2732
"github.com/relistan/go-director"
33+
log "github.com/sirupsen/logrus"
34+
2835
. "github.com/smartystreets/goconvey/convey"
29-
"google.golang.org/grpc"
3036
)
3137

3238
const (
@@ -53,7 +59,8 @@ func validateListener(serialisedListener *any.Any, svc service.Service) {
5359
filters := filterChains[0].GetFilters()
5460
So(filters, ShouldHaveLength, 1)
5561

56-
if svc.ProxyMode == "http" {
62+
switch svc.ProxyMode {
63+
case "http":
5764
So(filters[0].GetName(), ShouldEqual, wellknown.HTTPConnectionManager)
5865
connectionManager := &hcm.HttpConnectionManager{}
5966
err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager)
@@ -68,13 +75,33 @@ func validateListener(serialisedListener *any.Any, svc service.Service) {
6875
So(route, ShouldNotBeNil)
6976
So(route.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort))
7077
So(route.GetTimeout(), ShouldNotBeNil)
71-
} else { // tcp
78+
case "tcp":
7279
So(filters[0].GetName(), ShouldEqual, wellknown.TCPProxy)
7380
connectionManager := &tcpp.TcpProxy{}
7481
err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager)
7582
So(err, ShouldBeNil)
7683
So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_tcp")
7784
So(connectionManager.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort))
85+
case "ws":
86+
So(filters[0].GetName(), ShouldEqual, wellknown.HTTPConnectionManager)
87+
connectionManager := &hcm.HttpConnectionManager{}
88+
err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager)
89+
So(err, ShouldBeNil)
90+
So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_http")
91+
So(connectionManager.GetRouteConfig(), ShouldNotBeNil)
92+
So(connectionManager.GetRouteConfig().GetVirtualHosts(), ShouldHaveLength, 1)
93+
virtualHost := connectionManager.GetRouteConfig().GetVirtualHosts()[0]
94+
So(virtualHost.GetName(), ShouldEqual, svc.Name)
95+
So(virtualHost.GetRoutes(), ShouldHaveLength, 1)
96+
route := virtualHost.GetRoutes()[0].GetRoute()
97+
So(route, ShouldNotBeNil)
98+
So(route.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort))
99+
So(route.GetTimeout(), ShouldNotBeNil)
100+
101+
// websocket stuff
102+
upgradeConfigs := connectionManager.GetUpgradeConfigs()
103+
So(len(upgradeConfigs), ShouldEqual, 1)
104+
So(upgradeConfigs[0].UpgradeType, ShouldEqual, "websocket")
78105
}
79106
}
80107

@@ -137,7 +164,8 @@ func (sv *EnvoyMock) GetResource(stream envoy_discovery.AggregatedDiscoveryServi
137164
So(err, ShouldBeNil)
138165
}
139166

140-
// Recv() blocks until the stream ctx expires if the message sent via Send() is not recognised / valid
167+
// Recv() blocks until the stream ctx expires if the message sent via
168+
// Send() is not recognised / valid
141169
response, err := stream.Recv()
142170

143171
So(err, ShouldBeNil)
@@ -155,8 +183,8 @@ func (sv *EnvoyMock) ValidateResources(stream envoy_discovery.AggregatedDiscover
155183
}
156184
}
157185

158-
// SnapshotCache is a light wrapper around cache.SnapshotCache which lets
159-
// us get a notification after calling SetSnapshot via the Waiter chan
186+
// SnapshotCache is a light wrapper around cache.SnapshotCache which lets us
187+
// get a notification after calling SetSnapshot via the Waiter chan
160188
type SnapshotCache struct {
161189
cache.SnapshotCache
162190
Waiter chan struct{}
@@ -184,6 +212,8 @@ func Test_PortForServicePort(t *testing.T) {
184212
BindIP: bindIP,
185213
}
186214

215+
log.SetOutput(ioutil.Discard)
216+
187217
state := catalog.NewServicesState()
188218

189219
dummyHostname := "carcasone"
@@ -227,6 +257,19 @@ func Test_PortForServicePort(t *testing.T) {
227257
},
228258
}
229259

260+
wsSvc := service.Service{
261+
ID: "deadbeef666",
262+
Name: "kafka",
263+
Created: baseTime,
264+
Hostname: dummyHostname,
265+
Updated: baseTime,
266+
Status: service.ALIVE,
267+
ProxyMode: "ws",
268+
Ports: []service.Port{
269+
{IP: "127.0.0.1", Port: 6666, ServicePort: 10102},
270+
},
271+
}
272+
230273
ctx, cancel := context.WithCancel(context.Background())
231274
Reset(func() {
232275
cancel()
@@ -242,14 +285,14 @@ func Test_PortForServicePort(t *testing.T) {
242285
xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}),
243286
}
244287

245-
// The gRPC listener will be assigned a random port and will be owned and managed
246-
// by the gRPC server
288+
// The gRPC listener will be assigned a random port and will be owned
289+
// and managed by the gRPC server
247290
lis, err := net.Listen("tcp", ":0")
248291
So(err, ShouldBeNil)
249292
So(lis.Addr(), ShouldHaveSameTypeAs, &net.TCPAddr{})
250293

251-
// Using a FreeLooper instead would make it run too often, triggering spurious
252-
// locking on the state, which can cause the tests to time out
294+
// Using a FreeLooper instead would make it run too often, triggering
295+
// spurious locking on the state, which can cause the tests to time out
253296
go server.Run(ctx, director.NewTimedLooper(director.FOREVER, 10*time.Millisecond, make(chan error)), lis)
254297

255298
Convey("sends the Envoy state via gRPC", func() {
@@ -317,6 +360,13 @@ func Test_PortForServicePort(t *testing.T) {
317360
envoyMock.ValidateResources(stream, tcpSvc, state.Hostname)
318361
})
319362

363+
Convey("for a Websocket service", func() {
364+
state.AddServiceEntry(wsSvc)
365+
<-snapshotCache.Waiter
366+
367+
envoyMock.ValidateResources(stream, wsSvc, state.Hostname)
368+
})
369+
320370
Convey("and skips tombstones", func() {
321371
httpSvc.Tombstone()
322372
state.AddServiceEntry(httpSvc)

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ require (
3838
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 // indirect
3939
golang.org/x/text v0.3.2 // indirect
4040
google.golang.org/grpc v1.27.0
41+
google.golang.org/protobuf v1.23.0
4142
gopkg.in/alecthomas/kingpin.v2 v2.2.5
4243
gopkg.in/jarcoal/httpmock.v1 v1.0.0-20170412085702-cf52904a3cf0
4344
gopkg.in/relistan/rubberneck.v1 v1.0.1

haproxy/haproxy.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type HAproxy struct {
4343

4444
// Constructs a properly configured HAProxy and returns a pointer to it
4545
func New(configFile string, pidFile string) *HAproxy {
46-
reloadCmd := "haproxy -f " + configFile + " -p " + pidFile + " `[[ -f " + pidFile + " ]] && echo \"-sf $(cat " + pidFile + ")\"]]`"
46+
reloadCmd := "haproxy -f " + configFile + " -p " + pidFile + " `[[ -f " + pidFile + " ]] && echo \"-sf $(cat " + pidFile + ")\"`"
4747
verifyCmd := "haproxy -c -f " + configFile
4848

4949
proxy := HAproxy{
@@ -337,7 +337,12 @@ func getModes(state *catalog.ServicesState) map[string]string {
337337
modeMap := make(map[string]string)
338338
state.EachService(
339339
func(hostname *string, serviceId *string, svc *service.Service) {
340-
modeMap[svc.Name] = svc.ProxyMode
340+
mode := svc.ProxyMode
341+
// Treat websockets like HTTP
342+
if mode == "ws" {
343+
mode = "http"
344+
}
345+
modeMap[svc.Name] = mode
341346
},
342347
)
343348
return modeMap

0 commit comments

Comments
 (0)