Skip to content

Commit f97cc04

Browse files
authored
Move RPC router from Client/Server and into BaseDeps (hashicorp#8559)
This will allow it to be a shared component which is needed for AutoConfig
1 parent 7c3914d commit f97cc04

15 files changed

+240
-52
lines changed

agent/agent.go

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/hashicorp/consul/agent/dns"
21+
"github.com/hashicorp/consul/agent/router"
2122
"github.com/hashicorp/go-connlimit"
2223
"github.com/hashicorp/go-hclog"
2324
"github.com/hashicorp/go-memdb"
@@ -306,6 +307,9 @@ type Agent struct {
306307
// Connection Pool
307308
connPool *pool.ConnPool
308309

310+
// Shared RPC Router
311+
router *router.Router
312+
309313
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
310314
enterpriseAgent
311315
}
@@ -351,6 +355,7 @@ func New(bd BaseDeps) (*Agent, error) {
351355
MemSink: bd.MetricsHandler,
352356
connPool: bd.ConnPool,
353357
autoConf: bd.AutoConfig,
358+
router: bd.Router,
354359
}
355360

356361
a.serviceManager = NewServiceManager(&a)
@@ -462,6 +467,7 @@ func (a *Agent) Start(ctx context.Context) error {
462467
consul.WithTokenStore(a.tokens),
463468
consul.WithTLSConfigurator(a.tlsConfigurator),
464469
consul.WithConnectionPool(a.connPool),
470+
consul.WithRouter(a.router),
465471
}
466472

467473
// Setup either the client or the server.

agent/agent_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -4741,3 +4741,33 @@ func TestAgent_AutoEncrypt(t *testing.T) {
47414741
require.Len(t, x509Cert.URIs, 1)
47424742
require.Equal(t, id.URI(), x509Cert.URIs[0])
47434743
}
4744+
4745+
func TestSharedRPCRouter(t *testing.T) {
4746+
// this test runs both a server and client and ensures that the shared
4747+
// router is being used. It would be possible for the Client and Server
4748+
// types to create and use their own routers and for RPCs such as the
4749+
// ones used in WaitForTestAgent to succeed. However accessing the
4750+
// router stored on the agent ensures that Serf information from the
4751+
// Client/Server types are being set in the same shared rpc router.
4752+
4753+
srv := NewTestAgent(t, "")
4754+
defer srv.Shutdown()
4755+
4756+
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
4757+
4758+
mgr, server := srv.Agent.router.FindLANRoute()
4759+
require.NotNil(t, mgr)
4760+
require.NotNil(t, server)
4761+
4762+
client := NewTestAgent(t, `
4763+
server = false
4764+
bootstrap = false
4765+
retry_join = ["`+srv.Config.SerfBindAddrLAN.String()+`"]
4766+
`)
4767+
4768+
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
4769+
4770+
mgr, server = client.Agent.router.FindLANRoute()
4771+
require.NotNil(t, mgr)
4772+
require.NotNil(t, server)
4773+
}

agent/consul/auto_encrypt.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (c *Client) RequestAutoEncryptCerts(ctx context.Context, servers []string,
7373
// Check if we know about a server already through gossip. Depending on
7474
// how the agent joined, there might already be one. Also in case this
7575
// gets called because the cert expired.
76-
server := c.routers.FindServer()
76+
server := c.router.FindLANServer()
7777
if server != nil {
7878
servers = []string{server.Addr.String()}
7979
}

agent/consul/client.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hashicorp/consul/lib"
1616
"github.com/hashicorp/consul/logging"
1717
"github.com/hashicorp/consul/tlsutil"
18+
"github.com/hashicorp/consul/types"
1819
"github.com/hashicorp/go-hclog"
1920
"github.com/hashicorp/serf/serf"
2021
"golang.org/x/time/rate"
@@ -59,9 +60,9 @@ type Client struct {
5960
// Connection pool to consul servers
6061
connPool *pool.ConnPool
6162

62-
// routers is responsible for the selection and maintenance of
63+
// router is responsible for the selection and maintenance of
6364
// Consul servers this agent uses for RPC requests
64-
routers *router.Manager
65+
router *router.Router
6566

6667
// rpcLimiter is used to rate limit the total number of RPCs initiated
6768
// from an agent.
@@ -120,12 +121,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
120121
}
121122
}
122123

124+
logger := flat.logger.NamedIntercept(logging.ConsulClient)
125+
123126
// Create client
124127
c := &Client{
125128
config: config,
126129
connPool: connPool,
127130
eventCh: make(chan serf.Event, serfEventBacklog),
128-
logger: flat.logger.NamedIntercept(logging.ConsulClient),
131+
logger: logger,
129132
shutdownCh: make(chan struct{}),
130133
tlsConfigurator: tlsConfigurator,
131134
}
@@ -160,15 +163,22 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
160163
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
161164
}
162165

163-
// Start maintenance task for servers
164-
c.routers = router.New(c.logger, c.shutdownCh, c.serf, c.connPool, "")
165-
go c.routers.Start()
166+
rpcRouter := flat.router
167+
if rpcRouter == nil {
168+
rpcRouter = router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
169+
}
170+
171+
if err := rpcRouter.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
172+
c.Shutdown()
173+
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
174+
}
175+
c.router = rpcRouter
166176

167177
// Start LAN event handlers after the router is complete since the event
168178
// handlers depend on the router and the router depends on Serf.
169179
go c.lanEventHandler()
170180

171-
// This needs to happen after initializing c.routers to prevent a race
181+
// This needs to happen after initializing c.router to prevent a race
172182
// condition where the router manager is used when the pointer is nil
173183
if c.acls.ACLsEnabled() {
174184
go c.monitorACLMode()
@@ -276,7 +286,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
276286
firstCheck := time.Now()
277287

278288
TRY:
279-
server := c.routers.FindServer()
289+
manager, server := c.router.FindLANRoute()
280290
if server == nil {
281291
return structs.ErrNoServers
282292
}
@@ -301,7 +311,7 @@ TRY:
301311
"error", rpcErr,
302312
)
303313
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
304-
c.routers.NotifyFailedServer(server)
314+
manager.NotifyFailedServer(server)
305315
if retry := canRetry(args, rpcErr); !retry {
306316
return rpcErr
307317
}
@@ -323,7 +333,7 @@ TRY:
323333
// operation.
324334
func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
325335
replyFn structs.SnapshotReplyFn) error {
326-
server := c.routers.FindServer()
336+
manager, server := c.router.FindLANRoute()
327337
if server == nil {
328338
return structs.ErrNoServers
329339
}
@@ -339,6 +349,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
339349
var reply structs.SnapshotResponse
340350
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, args, in, &reply)
341351
if err != nil {
352+
manager.NotifyFailedServer(server)
342353
return err
343354
}
344355
defer func() {
@@ -367,7 +378,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
367378
// Stats is used to return statistics for debugging and insight
368379
// for various sub-systems
369380
func (c *Client) Stats() map[string]map[string]string {
370-
numServers := c.routers.NumServers()
381+
numServers := c.router.GetLANManager().NumServers()
371382

372383
toString := func(v uint64) string {
373384
return strconv.FormatUint(v, 10)

agent/consul/client_serf.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/hashicorp/consul/agent/structs"
1010
"github.com/hashicorp/consul/lib"
1111
"github.com/hashicorp/consul/logging"
12+
"github.com/hashicorp/consul/types"
1213
"github.com/hashicorp/go-hclog"
1314
"github.com/hashicorp/serf/serf"
1415
)
@@ -115,7 +116,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
115116
continue
116117
}
117118
c.logger.Info("adding server", "server", parts)
118-
c.routers.AddServer(parts)
119+
c.router.AddServer(types.AreaLAN, parts)
119120

120121
// Trigger the callback
121122
if c.config.ServerUp != nil {
@@ -139,7 +140,7 @@ func (c *Client) nodeUpdate(me serf.MemberEvent) {
139140
continue
140141
}
141142
c.logger.Info("updating server", "server", parts.String())
142-
c.routers.AddServer(parts)
143+
c.router.AddServer(types.AreaLAN, parts)
143144
}
144145
}
145146

@@ -151,7 +152,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
151152
continue
152153
}
153154
c.logger.Info("removing server", "server", parts.String())
154-
c.routers.RemoveServer(parts)
155+
c.router.RemoveServer(types.AreaLAN, parts)
155156
}
156157
}
157158

agent/consul/client_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func TestClient_JoinLAN(t *testing.T) {
112112
joinLAN(t, c1, s1)
113113
testrpc.WaitForTestAgent(t, c1.RPC, "dc1")
114114
retry.Run(t, func(r *retry.R) {
115-
if got, want := c1.routers.NumServers(), 1; got != want {
115+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
116116
r.Fatalf("got %d servers want %d", got, want)
117117
}
118118
if got, want := len(s1.LANMembers()), 2; got != want {
@@ -150,7 +150,7 @@ func TestClient_LANReap(t *testing.T) {
150150

151151
// Check the router has both
152152
retry.Run(t, func(r *retry.R) {
153-
server := c1.routers.FindServer()
153+
server := c1.router.FindLANServer()
154154
require.NotNil(t, server)
155155
require.Equal(t, s1.config.NodeName, server.Name)
156156
})
@@ -160,7 +160,7 @@ func TestClient_LANReap(t *testing.T) {
160160

161161
retry.Run(t, func(r *retry.R) {
162162
require.Len(r, c1.LANMembers(), 1)
163-
server := c1.routers.FindServer()
163+
server := c1.router.FindLANServer()
164164
require.Nil(t, server)
165165
})
166166
}
@@ -390,7 +390,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
390390
}
391391

392392
// Sleep to allow Serf to sync, shuffle, and let the shuffle complete
393-
c.routers.ResetRebalanceTimer()
393+
c.router.GetLANManager().ResetRebalanceTimer()
394394
time.Sleep(time.Second)
395395

396396
if len(c.LANMembers()) != numServers+numClients {
@@ -406,7 +406,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
406406
var pingCount int
407407
for range servers {
408408
time.Sleep(200 * time.Millisecond)
409-
s := c.routers.FindServer()
409+
m, s := c.router.FindLANRoute()
410410
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
411411
if !ok {
412412
t.Errorf("Unable to ping server %v: %s", s.String(), err)
@@ -415,7 +415,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
415415

416416
// Artificially fail the server in order to rotate the server
417417
// list
418-
c.routers.NotifyFailedServer(s)
418+
m.NotifyFailedServer(s)
419419
}
420420

421421
if pingCount != numServers {
@@ -524,7 +524,7 @@ func TestClient_SnapshotRPC(t *testing.T) {
524524

525525
// Wait until we've got a healthy server.
526526
retry.Run(t, func(r *retry.R) {
527-
if got, want := c1.routers.NumServers(), 1; got != want {
527+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
528528
r.Fatalf("got %d servers want %d", got, want)
529529
}
530530
})
@@ -559,7 +559,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
559559

560560
joinLAN(t, c1, s1)
561561
retry.Run(t, func(r *retry.R) {
562-
if got, want := c1.routers.NumServers(), 1; got != want {
562+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
563563
r.Fatalf("got %d servers want %d", got, want)
564564
}
565565
})
@@ -607,7 +607,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
607607
}
608608

609609
// Wait until we've got a healthy server.
610-
if got, want := c1.routers.NumServers(), 1; got != want {
610+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
611611
r.Fatalf("got %d servers want %d", got, want)
612612
}
613613
})

agent/consul/coordinate_endpoint.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/hashicorp/consul/agent/consul/state"
1111
"github.com/hashicorp/consul/agent/structs"
1212
"github.com/hashicorp/consul/logging"
13+
"github.com/hashicorp/consul/types"
1314
"github.com/hashicorp/go-hclog"
1415
"github.com/hashicorp/go-memdb"
1516
)
@@ -161,23 +162,32 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
161162

162163
// ListDatacenters returns the list of datacenters and their respective nodes
163164
// and the raw coordinates of those nodes (if no coordinates are available for
164-
// any of the nodes, the node list may be empty).
165+
// any of the nodes, the node list may be empty). This endpoint will not return
166+
// information about the LAN network area.
165167
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
166168
maps, err := c.srv.router.GetDatacenterMaps()
167169
if err != nil {
168170
return err
169171
}
170172

173+
var out []structs.DatacenterMap
174+
171175
// Strip the datacenter suffixes from all the node names.
172-
for i := range maps {
173-
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
174-
for j := range maps[i].Coordinates {
175-
node := maps[i].Coordinates[j].Node
176-
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
176+
for _, dcMap := range maps {
177+
if dcMap.AreaID == types.AreaLAN {
178+
continue
179+
}
180+
181+
suffix := fmt.Sprintf(".%s", dcMap.Datacenter)
182+
for j := range dcMap.Coordinates {
183+
node := dcMap.Coordinates[j].Node
184+
dcMap.Coordinates[j].Node = strings.TrimSuffix(node, suffix)
177185
}
186+
187+
out = append(out, dcMap)
178188
}
179189

180-
*reply = maps
190+
*reply = out
181191
return nil
182192
}
183193

agent/consul/options.go

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package consul
22

33
import (
44
"github.com/hashicorp/consul/agent/pool"
5+
"github.com/hashicorp/consul/agent/router"
56
"github.com/hashicorp/consul/agent/token"
67
"github.com/hashicorp/consul/tlsutil"
78
"github.com/hashicorp/go-hclog"
@@ -12,6 +13,7 @@ type consulOptions struct {
1213
tlsConfigurator *tlsutil.Configurator
1314
connPool *pool.ConnPool
1415
tokens *token.Store
16+
router *router.Router
1517
}
1618

1719
type ConsulOption func(*consulOptions)
@@ -40,6 +42,12 @@ func WithTokenStore(tokens *token.Store) ConsulOption {
4042
}
4143
}
4244

45+
func WithRouter(router *router.Router) ConsulOption {
46+
return func(opt *consulOptions) {
47+
opt.router = router
48+
}
49+
}
50+
4351
func flattenConsulOptions(options []ConsulOption) consulOptions {
4452
var flat consulOptions
4553
for _, opt := range options {

0 commit comments

Comments
 (0)