Skip to content

Commit fafc6cf

Browse files
mkeelerhashicorp-ci
authored andcommitted
Move RPC router from Client/Server and into BaseDeps (#8559)
This will allow it to be a shared component which is needed for AutoConfig
1 parent 3d1e043 commit fafc6cf

15 files changed

+240
-52
lines changed

agent/agent.go

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/hashicorp/consul/agent/dns"
21+
"github.com/hashicorp/consul/agent/router"
2122
"github.com/hashicorp/go-connlimit"
2223
"github.com/hashicorp/go-hclog"
2324
"github.com/hashicorp/go-memdb"
@@ -307,6 +308,9 @@ type Agent struct {
307308
// Connection Pool
308309
connPool *pool.ConnPool
309310

311+
// Shared RPC Router
312+
router *router.Router
313+
310314
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
311315
enterpriseAgent
312316
}
@@ -352,6 +356,7 @@ func New(bd BaseDeps) (*Agent, error) {
352356
MemSink: bd.MetricsHandler,
353357
connPool: bd.ConnPool,
354358
autoConf: bd.AutoConfig,
359+
router: bd.Router,
355360
}
356361

357362
a.serviceManager = NewServiceManager(&a)
@@ -463,6 +468,7 @@ func (a *Agent) Start(ctx context.Context) error {
463468
consul.WithTokenStore(a.tokens),
464469
consul.WithTLSConfigurator(a.tlsConfigurator),
465470
consul.WithConnectionPool(a.connPool),
471+
consul.WithRouter(a.router),
466472
}
467473

468474
// Setup either the client or the server.

agent/agent_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -4752,3 +4752,33 @@ func TestAgent_AutoEncrypt(t *testing.T) {
47524752
require.Len(t, x509Cert.URIs, 1)
47534753
require.Equal(t, id.URI(), x509Cert.URIs[0])
47544754
}
4755+
4756+
func TestSharedRPCRouter(t *testing.T) {
4757+
// this test runs both a server and client and ensures that the shared
4758+
// router is being used. It would be possible for the Client and Server
4759+
// types to create and use their own routers and for RPCs such as the
4760+
// ones used in WaitForTestAgent to succeed. However accessing the
4761+
// router stored on the agent ensures that Serf information from the
4762+
// Client/Server types are being set in the same shared rpc router.
4763+
4764+
srv := NewTestAgent(t, "")
4765+
defer srv.Shutdown()
4766+
4767+
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
4768+
4769+
mgr, server := srv.Agent.router.FindLANRoute()
4770+
require.NotNil(t, mgr)
4771+
require.NotNil(t, server)
4772+
4773+
client := NewTestAgent(t, `
4774+
server = false
4775+
bootstrap = false
4776+
retry_join = ["`+srv.Config.SerfBindAddrLAN.String()+`"]
4777+
`)
4778+
4779+
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
4780+
4781+
mgr, server = client.Agent.router.FindLANRoute()
4782+
require.NotNil(t, mgr)
4783+
require.NotNil(t, server)
4784+
}

agent/consul/auto_encrypt.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (c *Client) RequestAutoEncryptCerts(ctx context.Context, servers []string,
7373
// Check if we know about a server already through gossip. Depending on
7474
// how the agent joined, there might already be one. Also in case this
7575
// gets called because the cert expired.
76-
server := c.routers.FindServer()
76+
server := c.router.FindLANServer()
7777
if server != nil {
7878
servers = []string{server.Addr.String()}
7979
}

agent/consul/client.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hashicorp/consul/lib"
1616
"github.com/hashicorp/consul/logging"
1717
"github.com/hashicorp/consul/tlsutil"
18+
"github.com/hashicorp/consul/types"
1819
"github.com/hashicorp/go-hclog"
1920
"github.com/hashicorp/serf/serf"
2021
"golang.org/x/time/rate"
@@ -59,9 +60,9 @@ type Client struct {
5960
// Connection pool to consul servers
6061
connPool *pool.ConnPool
6162

62-
// routers is responsible for the selection and maintenance of
63+
// router is responsible for the selection and maintenance of
6364
// Consul servers this agent uses for RPC requests
64-
routers *router.Manager
65+
router *router.Router
6566

6667
// rpcLimiter is used to rate limit the total number of RPCs initiated
6768
// from an agent.
@@ -120,12 +121,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
120121
}
121122
}
122123

124+
logger := flat.logger.NamedIntercept(logging.ConsulClient)
125+
123126
// Create client
124127
c := &Client{
125128
config: config,
126129
connPool: connPool,
127130
eventCh: make(chan serf.Event, serfEventBacklog),
128-
logger: flat.logger.NamedIntercept(logging.ConsulClient),
131+
logger: logger,
129132
shutdownCh: make(chan struct{}),
130133
tlsConfigurator: tlsConfigurator,
131134
}
@@ -160,15 +163,22 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
160163
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
161164
}
162165

163-
// Start maintenance task for servers
164-
c.routers = router.New(c.logger, c.shutdownCh, c.serf, c.connPool, "")
165-
go c.routers.Start()
166+
rpcRouter := flat.router
167+
if rpcRouter == nil {
168+
rpcRouter = router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
169+
}
170+
171+
if err := rpcRouter.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
172+
c.Shutdown()
173+
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
174+
}
175+
c.router = rpcRouter
166176

167177
// Start LAN event handlers after the router is complete since the event
168178
// handlers depend on the router and the router depends on Serf.
169179
go c.lanEventHandler()
170180

171-
// This needs to happen after initializing c.routers to prevent a race
181+
// This needs to happen after initializing c.router to prevent a race
172182
// condition where the router manager is used when the pointer is nil
173183
if c.acls.ACLsEnabled() {
174184
go c.monitorACLMode()
@@ -276,7 +286,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
276286
firstCheck := time.Now()
277287

278288
TRY:
279-
server := c.routers.FindServer()
289+
manager, server := c.router.FindLANRoute()
280290
if server == nil {
281291
return structs.ErrNoServers
282292
}
@@ -301,7 +311,7 @@ TRY:
301311
"error", rpcErr,
302312
)
303313
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
304-
c.routers.NotifyFailedServer(server)
314+
manager.NotifyFailedServer(server)
305315
if retry := canRetry(args, rpcErr); !retry {
306316
return rpcErr
307317
}
@@ -323,7 +333,7 @@ TRY:
323333
// operation.
324334
func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
325335
replyFn structs.SnapshotReplyFn) error {
326-
server := c.routers.FindServer()
336+
manager, server := c.router.FindLANRoute()
327337
if server == nil {
328338
return structs.ErrNoServers
329339
}
@@ -339,6 +349,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
339349
var reply structs.SnapshotResponse
340350
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, args, in, &reply)
341351
if err != nil {
352+
manager.NotifyFailedServer(server)
342353
return err
343354
}
344355
defer func() {
@@ -367,7 +378,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
367378
// Stats is used to return statistics for debugging and insight
368379
// for various sub-systems
369380
func (c *Client) Stats() map[string]map[string]string {
370-
numServers := c.routers.NumServers()
381+
numServers := c.router.GetLANManager().NumServers()
371382

372383
toString := func(v uint64) string {
373384
return strconv.FormatUint(v, 10)

agent/consul/client_serf.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/hashicorp/consul/agent/structs"
1010
"github.com/hashicorp/consul/lib"
1111
"github.com/hashicorp/consul/logging"
12+
"github.com/hashicorp/consul/types"
1213
"github.com/hashicorp/go-hclog"
1314
"github.com/hashicorp/serf/serf"
1415
)
@@ -115,7 +116,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
115116
continue
116117
}
117118
c.logger.Info("adding server", "server", parts)
118-
c.routers.AddServer(parts)
119+
c.router.AddServer(types.AreaLAN, parts)
119120

120121
// Trigger the callback
121122
if c.config.ServerUp != nil {
@@ -139,7 +140,7 @@ func (c *Client) nodeUpdate(me serf.MemberEvent) {
139140
continue
140141
}
141142
c.logger.Info("updating server", "server", parts.String())
142-
c.routers.AddServer(parts)
143+
c.router.AddServer(types.AreaLAN, parts)
143144
}
144145
}
145146

@@ -151,7 +152,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
151152
continue
152153
}
153154
c.logger.Info("removing server", "server", parts.String())
154-
c.routers.RemoveServer(parts)
155+
c.router.RemoveServer(types.AreaLAN, parts)
155156
}
156157
}
157158

agent/consul/client_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func TestClient_JoinLAN(t *testing.T) {
115115
joinLAN(t, c1, s1)
116116
testrpc.WaitForTestAgent(t, c1.RPC, "dc1")
117117
retry.Run(t, func(r *retry.R) {
118-
if got, want := c1.routers.NumServers(), 1; got != want {
118+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
119119
r.Fatalf("got %d servers want %d", got, want)
120120
}
121121
if got, want := len(s1.LANMembers()), 2; got != want {
@@ -153,7 +153,7 @@ func TestClient_LANReap(t *testing.T) {
153153

154154
// Check the router has both
155155
retry.Run(t, func(r *retry.R) {
156-
server := c1.routers.FindServer()
156+
server := c1.router.FindLANServer()
157157
require.NotNil(t, server)
158158
require.Equal(t, s1.config.NodeName, server.Name)
159159
})
@@ -163,7 +163,7 @@ func TestClient_LANReap(t *testing.T) {
163163

164164
retry.Run(t, func(r *retry.R) {
165165
require.Len(r, c1.LANMembers(), 1)
166-
server := c1.routers.FindServer()
166+
server := c1.router.FindLANServer()
167167
require.Nil(t, server)
168168
})
169169
}
@@ -393,7 +393,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
393393
}
394394

395395
// Sleep to allow Serf to sync, shuffle, and let the shuffle complete
396-
c.routers.ResetRebalanceTimer()
396+
c.router.GetLANManager().ResetRebalanceTimer()
397397
time.Sleep(time.Second)
398398

399399
if len(c.LANMembers()) != numServers+numClients {
@@ -409,7 +409,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
409409
var pingCount int
410410
for range servers {
411411
time.Sleep(200 * time.Millisecond)
412-
s := c.routers.FindServer()
412+
m, s := c.router.FindLANRoute()
413413
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
414414
if !ok {
415415
t.Errorf("Unable to ping server %v: %s", s.String(), err)
@@ -418,7 +418,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
418418

419419
// Artificially fail the server in order to rotate the server
420420
// list
421-
c.routers.NotifyFailedServer(s)
421+
m.NotifyFailedServer(s)
422422
}
423423

424424
if pingCount != numServers {
@@ -527,7 +527,7 @@ func TestClient_SnapshotRPC(t *testing.T) {
527527

528528
// Wait until we've got a healthy server.
529529
retry.Run(t, func(r *retry.R) {
530-
if got, want := c1.routers.NumServers(), 1; got != want {
530+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
531531
r.Fatalf("got %d servers want %d", got, want)
532532
}
533533
})
@@ -562,7 +562,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
562562

563563
joinLAN(t, c1, s1)
564564
retry.Run(t, func(r *retry.R) {
565-
if got, want := c1.routers.NumServers(), 1; got != want {
565+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
566566
r.Fatalf("got %d servers want %d", got, want)
567567
}
568568
})
@@ -610,7 +610,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
610610
}
611611

612612
// Wait until we've got a healthy server.
613-
if got, want := c1.routers.NumServers(), 1; got != want {
613+
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
614614
r.Fatalf("got %d servers want %d", got, want)
615615
}
616616
})

agent/consul/coordinate_endpoint.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/hashicorp/consul/agent/consul/state"
1111
"github.com/hashicorp/consul/agent/structs"
1212
"github.com/hashicorp/consul/logging"
13+
"github.com/hashicorp/consul/types"
1314
"github.com/hashicorp/go-hclog"
1415
"github.com/hashicorp/go-memdb"
1516
)
@@ -161,23 +162,32 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
161162

162163
// ListDatacenters returns the list of datacenters and their respective nodes
163164
// and the raw coordinates of those nodes (if no coordinates are available for
164-
// any of the nodes, the node list may be empty).
165+
// any of the nodes, the node list may be empty). This endpoint will not return
166+
// information about the LAN network area.
165167
func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
166168
maps, err := c.srv.router.GetDatacenterMaps()
167169
if err != nil {
168170
return err
169171
}
170172

173+
var out []structs.DatacenterMap
174+
171175
// Strip the datacenter suffixes from all the node names.
172-
for i := range maps {
173-
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
174-
for j := range maps[i].Coordinates {
175-
node := maps[i].Coordinates[j].Node
176-
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
176+
for _, dcMap := range maps {
177+
if dcMap.AreaID == types.AreaLAN {
178+
continue
179+
}
180+
181+
suffix := fmt.Sprintf(".%s", dcMap.Datacenter)
182+
for j := range dcMap.Coordinates {
183+
node := dcMap.Coordinates[j].Node
184+
dcMap.Coordinates[j].Node = strings.TrimSuffix(node, suffix)
177185
}
186+
187+
out = append(out, dcMap)
178188
}
179189

180-
*reply = maps
190+
*reply = out
181191
return nil
182192
}
183193

agent/consul/options.go

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package consul
22

33
import (
44
"github.com/hashicorp/consul/agent/pool"
5+
"github.com/hashicorp/consul/agent/router"
56
"github.com/hashicorp/consul/agent/token"
67
"github.com/hashicorp/consul/tlsutil"
78
"github.com/hashicorp/go-hclog"
@@ -12,6 +13,7 @@ type consulOptions struct {
1213
tlsConfigurator *tlsutil.Configurator
1314
connPool *pool.ConnPool
1415
tokens *token.Store
16+
router *router.Router
1517
}
1618

1719
type ConsulOption func(*consulOptions)
@@ -40,6 +42,12 @@ func WithTokenStore(tokens *token.Store) ConsulOption {
4042
}
4143
}
4244

45+
func WithRouter(router *router.Router) ConsulOption {
46+
return func(opt *consulOptions) {
47+
opt.router = router
48+
}
49+
}
50+
4351
func flattenConsulOptions(options []ConsulOption) consulOptions {
4452
var flat consulOptions
4553
for _, opt := range options {

0 commit comments

Comments
 (0)