Skip to content

Commit c6ef1d8

Browse files
authored
Merge pull request #2175 from hashicorp/f-hold-rpc
Gracefully handle short lived outages by holding RPC calls
2 parents 64e3033 + 86a63ee commit c6ef1d8

File tree

3 files changed

+67
-16
lines changed

3 files changed

+67
-16
lines changed

consul/catalog_endpoint_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestCatalogRegister(t *testing.T) {
3434
var out struct{}
3535

3636
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
37-
if err == nil || err.Error() != "No cluster leader" {
37+
if err != nil {
3838
t.Fatalf("err: %v", err)
3939
}
4040

@@ -198,7 +198,7 @@ func TestCatalogDeregister(t *testing.T) {
198198
var out struct{}
199199

200200
err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out)
201-
if err == nil || err.Error() != "No cluster leader" {
201+
if err != nil {
202202
t.Fatalf("err: %v", err)
203203
}
204204

@@ -302,7 +302,7 @@ func TestCatalogListNodes(t *testing.T) {
302302
}
303303
var out structs.IndexedNodes
304304
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
305-
if err == nil || err.Error() != "No cluster leader" {
305+
if err != nil {
306306
t.Fatalf("err: %v", err)
307307
}
308308

@@ -621,7 +621,7 @@ func TestCatalogListServices(t *testing.T) {
621621
}
622622
var out structs.IndexedServices
623623
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out)
624-
if err == nil || err.Error() != "No cluster leader" {
624+
if err != nil {
625625
t.Fatalf("err: %v", err)
626626
}
627627

@@ -810,7 +810,7 @@ func TestCatalogListServiceNodes(t *testing.T) {
810810
}
811811
var out structs.IndexedServiceNodes
812812
err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out)
813-
if err == nil || err.Error() != "No cluster leader" {
813+
if err != nil {
814814
t.Fatalf("err: %v", err)
815815
}
816816

@@ -857,7 +857,7 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) {
857857
}
858858
var out structs.IndexedServiceNodes
859859
err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out)
860-
if err == nil || err.Error() != "No cluster leader" {
860+
if err != nil {
861861
t.Fatalf("err: %v", err)
862862
}
863863

@@ -944,7 +944,7 @@ func TestCatalogNodeServices(t *testing.T) {
944944
}
945945
var out structs.IndexedNodeServices
946946
err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out)
947-
if err == nil || err.Error() != "No cluster leader" {
947+
if err != nil {
948948
t.Fatalf("err: %v", err)
949949
}
950950

@@ -1001,7 +1001,7 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
10011001
var out struct{}
10021002

10031003
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
1004-
if err == nil || err.Error() != "No cluster leader" {
1004+
if err != nil {
10051005
t.Fatalf("err: %v", err)
10061006
}
10071007

consul/config.go

+10
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,13 @@ type Config struct {
224224
// are willing to apply in one period. After this limit we will issue a
225225
// warning and discard the remaining updates.
226226
CoordinateUpdateMaxBatches int
227+
228+
// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
229+
// This is used to paper over a loss of leadership by instead holding RPCs,
230+
// so that the caller experiences a slow response rather than an error.
231+
// This period is meant to be long enough for a leader election to take
232+
// place, and a small jitter is applied to avoid a thundering herd.
233+
RPCHoldTimeout time.Duration
227234
}
228235

229236
// CheckVersion is used to check if the ProtocolVersion is valid
@@ -286,6 +293,9 @@ func DefaultConfig() *Config {
286293
CoordinateUpdatePeriod: 5 * time.Second,
287294
CoordinateUpdateBatchSize: 128,
288295
CoordinateUpdateMaxBatches: 5,
296+
297+
// Hold an RPC for up to 5 seconds by default
298+
RPCHoldTimeout: 5 * time.Second,
289299
}
290300

291301
// Increase our reap interval to 3 days instead of 24h.

consul/rpc.go

+49-8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/armon/go-metrics"
13+
"github.com/hashicorp/consul/consul/agent"
1314
"github.com/hashicorp/consul/consul/state"
1415
"github.com/hashicorp/consul/consul/structs"
1516
"github.com/hashicorp/consul/lib"
@@ -39,7 +40,8 @@ const (
3940

4041
// jitterFraction is a the limit to the amount of jitter we apply
4142
// to a user specified MaxQueryTime. We divide the specified time by
42-
// the fraction. So 16 == 6.25% limit of jitter
43+
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
44+
// is applied to the RPCHoldTimeout
4345
jitterFraction = 16
4446

4547
// Warn if the Raft command is larger than this.
@@ -189,6 +191,8 @@ func (s *Server) handleConsulConn(conn net.Conn) {
189191
// forward is used to forward to a remote DC or to forward to the local leader
190192
// Returns a bool of if forwarding was performed, as well as any error
191193
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
194+
var firstCheck time.Time
195+
192196
// Handle DC forwarding
193197
dc := info.RequestDatacenter()
194198
if dc != s.config.Datacenter {
@@ -201,27 +205,64 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
201205
return false, nil
202206
}
203207

204-
// Handle leader forwarding
205-
if !s.IsLeader() {
206-
err := s.forwardLeader(method, args, reply)
208+
CHECK_LEADER:
209+
// Find the leader
210+
isLeader, remoteServer := s.getLeader()
211+
212+
// Handle the case we are the leader
213+
if isLeader {
214+
return false, nil
215+
}
216+
217+
// Handle the case of a known leader
218+
if remoteServer != nil {
219+
err := s.forwardLeader(remoteServer, method, args, reply)
207220
return true, err
208221
}
209-
return false, nil
222+
223+
// Gate the request until there is a leader
224+
if firstCheck.IsZero() {
225+
firstCheck = time.Now()
226+
}
227+
if time.Now().Sub(firstCheck) < s.config.RPCHoldTimeout {
228+
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
229+
select {
230+
case <-time.After(jitter):
231+
goto CHECK_LEADER
232+
case <-s.shutdownCh:
233+
}
234+
}
235+
236+
// No leader found and hold time exceeded
237+
return true, structs.ErrNoLeader
210238
}
211239

212-
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
213-
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
240+
// getLeader returns if the current node is the leader, and if not
241+
// then it returns the leader which is potentially nil if the cluster
242+
// has not yet elected a leader.
243+
func (s *Server) getLeader() (bool, *agent.Server) {
244+
// Check if we are the leader
245+
if s.IsLeader() {
246+
return true, nil
247+
}
248+
214249
// Get the leader
215250
leader := s.raft.Leader()
216251
if leader == "" {
217-
return structs.ErrNoLeader
252+
return false, nil
218253
}
219254

220255
// Lookup the server
221256
s.localLock.RLock()
222257
server := s.localConsuls[leader]
223258
s.localLock.RUnlock()
224259

260+
// Server could be nil
261+
return false, server
262+
}
263+
264+
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
265+
func (s *Server) forwardLeader(server *agent.Server, method string, args interface{}, reply interface{}) error {
225266
// Handle a missing server
226267
if server == nil {
227268
return structs.ErrNoLeader

0 commit comments

Comments
 (0)