Skip to content

Commit 69fb484

Browse files
author
Thibault Gilles
committed
Register a service and its check atomically in the local state
Prevent race between register and deregister requests by saving them together in the local state on registration. Also adds more cleaning in case of failure when registering services / checks.
1 parent efc180a commit 69fb484

File tree

3 files changed

+156
-36
lines changed

3 files changed

+156
-36
lines changed

agent/agent.go

+113-32
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import (
2020

2121
"google.golang.org/grpc"
2222

23-
"github.com/armon/go-metrics"
23+
metrics "github.com/armon/go-metrics"
2424
"github.com/hashicorp/consul/acl"
2525
"github.com/hashicorp/consul/agent/ae"
2626
"github.com/hashicorp/consul/agent/cache"
27-
"github.com/hashicorp/consul/agent/cache-types"
27+
cachetype "github.com/hashicorp/consul/agent/cache-types"
2828
"github.com/hashicorp/consul/agent/checks"
2929
"github.com/hashicorp/consul/agent/config"
3030
"github.com/hashicorp/consul/agent/consul"
@@ -42,8 +42,8 @@ import (
4242
"github.com/hashicorp/consul/logger"
4343
"github.com/hashicorp/consul/types"
4444
"github.com/hashicorp/consul/watch"
45-
"github.com/hashicorp/go-multierror"
46-
"github.com/hashicorp/go-uuid"
45+
multierror "github.com/hashicorp/go-multierror"
46+
uuid "github.com/hashicorp/go-uuid"
4747
"github.com/hashicorp/memberlist"
4848
"github.com/hashicorp/raft"
4949
"github.com/hashicorp/serf/serf"
@@ -1911,15 +1911,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
19111911
snap := a.snapshotCheckState()
19121912
defer a.restoreCheckState(snap)
19131913

1914-
// Add the service
1915-
a.State.AddService(service, token)
1916-
1917-
// Persist the service to a file
1918-
if persist && a.config.DataDir != "" {
1919-
if err := a.persistService(service); err != nil {
1920-
return err
1921-
}
1922-
}
1914+
var checks []*structs.HealthCheck
19231915

19241916
// Create an associated health check
19251917
for i, chkType := range chkTypes {
@@ -1947,14 +1939,81 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
19471939
if chkType.Status != "" {
19481940
check.Status = chkType.Status
19491941
}
1950-
if err := a.AddCheck(check, chkType, persist, token, source); err != nil {
1942+
1943+
checks = append(checks, check)
1944+
}
1945+
1946+
// cleanup, store the ids of services and checks that weren't previously
1947+
// registered so we clean them up if somthing fails halfway through the
1948+
// process.
1949+
var cleanupServices []string
1950+
var cleanupChecks []types.CheckID
1951+
1952+
if s := a.State.Service(service.ID); s == nil {
1953+
cleanupServices = append(cleanupServices, service.ID)
1954+
}
1955+
1956+
for _, check := range checks {
1957+
if c := a.State.Check(check.CheckID); c == nil {
1958+
cleanupChecks = append(cleanupChecks, check.CheckID)
1959+
}
1960+
}
1961+
1962+
err := a.State.AddServiceWithChecks(service, checks, token)
1963+
if err != nil {
1964+
a.cleanupRegistration(cleanupServices, cleanupChecks)
1965+
return err
1966+
}
1967+
1968+
for i := range checks {
1969+
if err := a.addCheck(checks[i], chkTypes[i], service, persist, token, source); err != nil {
1970+
a.cleanupRegistration(cleanupServices, cleanupChecks)
1971+
return err
1972+
}
1973+
1974+
if persist && a.config.DataDir != "" {
1975+
if err := a.persistCheck(checks[i], chkTypes[i]); err != nil {
1976+
a.cleanupRegistration(cleanupServices, cleanupChecks)
1977+
return err
1978+
1979+
}
1980+
}
1981+
}
1982+
1983+
// Persist the service to a file
1984+
if persist && a.config.DataDir != "" {
1985+
if err := a.persistService(service); err != nil {
1986+
a.cleanupRegistration(cleanupServices, cleanupChecks)
19511987
return err
19521988
}
19531989
}
19541990

19551991
return nil
19561992
}
19571993

1994+
// cleanupRegistration is called on registration error to ensure no there are no
1995+
// leftovers after a partial failure
1996+
func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) {
1997+
for _, s := range serviceIDs {
1998+
if err := a.State.RemoveService(s); err != nil {
1999+
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove service %s: %s", s, err)
2000+
}
2001+
if err := a.purgeService(s); err != nil {
2002+
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service %s file: %s", s, err)
2003+
}
2004+
}
2005+
2006+
for _, c := range checksIDs {
2007+
a.cancelCheckMonitors(c)
2008+
if err := a.State.RemoveCheck(c); err != nil {
2009+
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to remove check %s: %s", c, err)
2010+
}
2011+
if err := a.purgeCheck(c); err != nil {
2012+
a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge check %s file: %s", c, err)
2013+
}
2014+
}
2015+
}
2016+
19582017
// RemoveService is used to remove a service entry.
19592018
// The agent will make a best effort to ensure it is deregistered
19602019
func (a *Agent) RemoveService(serviceID string, persist bool) error {
@@ -2018,6 +2077,44 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
20182077
// ensure it is registered. The Check may include a CheckType which
20192078
// is used to automatically update the check status
20202079
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, persist bool, token string, source configSource) error {
2080+
var service *structs.NodeService
2081+
2082+
if check.ServiceID != "" {
2083+
service = a.State.Service(check.ServiceID)
2084+
if service == nil {
2085+
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
2086+
}
2087+
}
2088+
2089+
// snapshot the current state of the health check to avoid potential flapping
2090+
existing := a.State.Check(check.CheckID)
2091+
defer func() {
2092+
if existing != nil {
2093+
a.State.UpdateCheck(check.CheckID, existing.Status, existing.Output)
2094+
}
2095+
}()
2096+
2097+
err := a.addCheck(check, chkType, service, persist, token, source)
2098+
if err != nil {
2099+
a.State.RemoveCheck(check.CheckID)
2100+
return err
2101+
}
2102+
2103+
// Add to the local state for anti-entropy
2104+
err = a.State.AddCheck(check, token)
2105+
if err != nil {
2106+
return err
2107+
}
2108+
2109+
// Persist the check
2110+
if persist && a.config.DataDir != "" {
2111+
return a.persistCheck(check, chkType)
2112+
}
2113+
2114+
return nil
2115+
}
2116+
2117+
func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, service *structs.NodeService, persist bool, token string, source configSource) error {
20212118
if check.CheckID == "" {
20222119
return fmt.Errorf("CheckID missing")
20232120
}
@@ -2039,12 +2136,8 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
20392136
}
20402137

20412138
if check.ServiceID != "" {
2042-
s := a.State.Service(check.ServiceID)
2043-
if s == nil {
2044-
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
2045-
}
2046-
check.ServiceName = s.Service
2047-
check.ServiceTags = s.Tags
2139+
check.ServiceName = service.Service
2140+
check.ServiceTags = service.Tags
20482141
}
20492142

20502143
a.checkLock.Lock()
@@ -2265,18 +2358,6 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
22652358
}
22662359
}
22672360

2268-
// Add to the local state for anti-entropy
2269-
err := a.State.AddCheck(check, token)
2270-
if err != nil {
2271-
a.cancelCheckMonitors(check.CheckID)
2272-
return err
2273-
}
2274-
2275-
// Persist the check
2276-
if persist && a.config.DataDir != "" {
2277-
return a.persistCheck(check, chkType)
2278-
}
2279-
22802361
return nil
22812362
}
22822363

agent/agent_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1839,7 +1839,7 @@ func TestAgent_PersistCheck(t *testing.T) {
18391839
t.Fatalf("err: %s", err)
18401840
}
18411841
if !bytes.Equal(expected, content) {
1842-
t.Fatalf("bad: %s", string(content))
1842+
t.Fatalf("bad: %s != %s", string(content), expected)
18431843
}
18441844

18451845
// Updates the check definition on disk

agent/local/state.go

+42-3
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,12 @@ func (l *State) serviceToken(id string) string {
265265
// This entry is persistent and the agent will make a best effort to
266266
// ensure it is registered
267267
func (l *State) AddService(service *structs.NodeService, token string) error {
268+
l.Lock()
269+
defer l.Unlock()
270+
return l.addServiceLocked(service, token)
271+
}
272+
273+
func (l *State) addServiceLocked(service *structs.NodeService, token string) error {
268274
if service == nil {
269275
return fmt.Errorf("no service")
270276
}
@@ -274,13 +280,31 @@ func (l *State) AddService(service *structs.NodeService, token string) error {
274280
service.ID = service.Service
275281
}
276282

277-
l.SetServiceState(&ServiceState{
283+
l.setServiceStateLocked(&ServiceState{
278284
Service: service,
279285
Token: token,
280286
})
281287
return nil
282288
}
283289

290+
// AddServiceWithChecks adds a service in its check in the local state atomically
291+
func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error {
292+
l.Lock()
293+
defer l.Unlock()
294+
295+
if err := l.addServiceLocked(service, token); err != nil {
296+
return err
297+
}
298+
299+
for _, check := range checks {
300+
if err := l.addCheckLocked(check, token); err != nil {
301+
return err
302+
}
303+
}
304+
305+
return nil
306+
}
307+
284308
// RemoveService is used to remove a service entry from the local state.
285309
// The agent will make a best effort to ensure it is deregistered.
286310
func (l *State) RemoveService(id string) error {
@@ -358,6 +382,10 @@ func (l *State) SetServiceState(s *ServiceState) {
358382
l.Lock()
359383
defer l.Unlock()
360384

385+
l.setServiceStateLocked(s)
386+
}
387+
388+
func (l *State) setServiceStateLocked(s *ServiceState) {
361389
s.WatchCh = make(chan struct{})
362390

363391
old, hasOld := l.services[s.Service.ID]
@@ -414,6 +442,13 @@ func (l *State) checkToken(id types.CheckID) string {
414442
// This entry is persistent and the agent will make a best effort to
415443
// ensure it is registered
416444
func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
445+
l.Lock()
446+
defer l.Unlock()
447+
448+
return l.addCheckLocked(check, token)
449+
}
450+
451+
func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
417452
if check == nil {
418453
return fmt.Errorf("no check")
419454
}
@@ -427,14 +462,14 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
427462

428463
// if there is a serviceID associated with the check, make sure it exists before adding it
429464
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
430-
if check.ServiceID != "" && l.Service(check.ServiceID) == nil {
465+
if _, ok := l.services[check.ServiceID]; check.ServiceID != "" && !ok {
431466
return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID)
432467
}
433468

434469
// hard-set the node name
435470
check.Node = l.config.NodeName
436471

437-
l.SetCheckState(&CheckState{
472+
l.setCheckStateLocked(&CheckState{
438473
Check: check,
439474
Token: token,
440475
})
@@ -620,6 +655,10 @@ func (l *State) SetCheckState(c *CheckState) {
620655
l.Lock()
621656
defer l.Unlock()
622657

658+
l.setCheckStateLocked(c)
659+
}
660+
661+
func (l *State) setCheckStateLocked(c *CheckState) {
623662
l.checks[c.Check.CheckID] = c
624663
l.TriggerSyncChanges()
625664
}

0 commit comments

Comments
 (0)