Skip to content

Commit 1f57ee6

Browse files
authored
Merge pull request #47 from mihaitodor/service-draining-status
Add service DRAINING status and a handler for the `drain` HTTP API endpoint
2 parents 8912f89 + c3d6bc6 commit 1f57ee6

14 files changed

+346
-36
lines changed

catalog/services_state.go

+38-7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
TOMBSTONE_SLEEP_INTERVAL = 2 * time.Second // Sleep between local service checks
3131
TOMBSTONE_RETRANSMIT = 1 * time.Second // Time between tombstone retranmission
3232
ALIVE_LIFESPAN = 1*time.Minute + 20*time.Second // Down if not heard from in 80 seconds
33+
DRAINING_LIFESPAN = 10 * time.Minute // Down if not heard from in 10 minutes
3334
ALIVE_SLEEP_INTERVAL = 1 * time.Second // Sleep between local service checks
3435
ALIVE_BROADCAST_INTERVAL = 1 * time.Minute // Broadcast Alive messages every minute
3536
)
@@ -132,6 +133,11 @@ func (state *ServicesState) ProcessServiceMsgs(looper director.Looper) {
132133
})
133134
}
134135

136+
// UpdateService enqueues a state update for a given service
137+
func (state *ServicesState) UpdateService(svc service.Service) {
138+
state.ServiceMsgs <- svc
139+
}
140+
135141
// Shortcut for checking if the Servers map has an entry for this
136142
// hostname.
137143
func (state *ServicesState) HasServer(hostname string) bool {
@@ -304,6 +310,11 @@ func (state *ServicesState) AddServiceEntry(newSvc service.Service) {
304310
// Store the previous newSvc so we can compare it
305311
oldEntry := server.Services[newSvc.ID]
306312

313+
// Make sure we preserve the DRAINING status for services
314+
if oldEntry.Status == service.DRAINING && newSvc.Status == service.ALIVE {
315+
newSvc.Status = oldEntry.Status
316+
}
317+
307318
// Update the new one
308319
server.Services[newSvc.ID] = &newSvc
309320

@@ -320,12 +331,28 @@ func (state *ServicesState) AddServiceEntry(newSvc service.Service) {
320331
}
321332
}
322333

334+
// GetLocalServiceByID returns a service for a given ID if it
335+
// happens to exist on the current host. Returns an error otherwise.
336+
func (state *ServicesState) GetLocalServiceByID(id string) (service.Service, error) {
337+
state.RLock()
338+
defer state.RUnlock()
339+
340+
if server, ok := state.Servers[state.Hostname]; ok {
341+
if svc, ok := server.Services[id]; ok {
342+
return *svc, nil
343+
}
344+
}
345+
346+
return service.Service{},
347+
fmt.Errorf("service with ID %q not found on host %q", id, state.Hostname)
348+
}
349+
323350
// Merge a complete state struct into this one. Usually used on
324351
// node startup and during anti-entropy operations.
325352
func (state *ServicesState) Merge(otherState *ServicesState) {
326353
for _, server := range otherState.Servers {
327-
for _, service := range server.Services {
328-
state.ServiceMsgs <- *service
354+
for _, svc := range server.Services {
355+
state.UpdateService(*svc)
329356
}
330357
}
331358
}
@@ -403,8 +430,8 @@ func (state *ServicesState) Print(list *memberlist.Memberlist) {
403430
// don't already know about.
404431
func (state *ServicesState) TrackNewServices(fn func() []service.Service, looper director.Looper) {
405432
looper.Loop(func() error {
406-
for _, container := range fn() {
407-
state.ServiceMsgs <- container
433+
for _, svc := range fn() {
434+
state.UpdateService(svc)
408435
}
409436
return nil
410437
})
@@ -610,12 +637,16 @@ func (state *ServicesState) TombstoneOthersServices() []service.Service {
610637
}
611638
}
612639

640+
svcLifespan := ALIVE_LIFESPAN
641+
if svc.IsDraining() {
642+
svcLifespan = DRAINING_LIFESPAN
643+
}
613644
// Everything that is not tombstoned needs to be considered for
614645
// removal if it exceeds the allowed ALIVE_TIMESPAN
615646
if !svc.IsTombstone() &&
616-
svc.Updated.Before(time.Now().UTC().Add(0-ALIVE_LIFESPAN)) {
617-
log.Warnf("Found expired service %s from %s, tombstoning",
618-
svc.Name, svc.Hostname,
647+
svc.Updated.Before(time.Now().UTC().Add(0-svcLifespan)) {
648+
log.Warnf("Found expired service %s ID %s from %s, tombstoning",
649+
svc.Name, svc.ID, svc.Hostname,
619650
)
620651

621652
// Because we don't know that other hosts haven't gotten a newer

catalog/services_state_test.go

+83
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,59 @@ func Test_ServicesStateWithData(t *testing.T) {
219219
}
220220
So(pendingBroadcast, ShouldBeFalse)
221221
})
222+
223+
Convey("Sets a service's status to DRAINING", func() {
224+
state.AddServiceEntry(svc)
225+
226+
svc.Status = service.DRAINING
227+
svc.Updated = time.Now().UTC()
228+
229+
state.AddServiceEntry(svc)
230+
231+
So(state.HasServer(anotherHostname), ShouldBeTrue)
232+
So(state.Servers[anotherHostname].Services[svc.ID].Status,
233+
ShouldEqual, service.DRAINING)
234+
})
235+
236+
Convey("Doesn't mark a DRAINING service as ALIVE", func() {
237+
svc.Status = service.DRAINING
238+
state.AddServiceEntry(svc)
239+
240+
svc.Status = service.ALIVE
241+
svc.Updated = time.Now().UTC()
242+
243+
state.AddServiceEntry(svc)
244+
245+
So(state.HasServer(anotherHostname), ShouldBeTrue)
246+
So(state.Servers[anotherHostname].Services[svc.ID].Status,
247+
ShouldEqual, service.DRAINING)
248+
})
249+
})
250+
251+
Convey("GetLocalServiceByID()", func() {
252+
Convey("Returns an existing service on the current host", func() {
253+
state.Hostname = anotherHostname
254+
state.AddServiceEntry(svc)
255+
256+
returnedSvc, err := state.GetLocalServiceByID(svc.ID)
257+
So(err, ShouldBeNil)
258+
So(returnedSvc.ID, ShouldEqual, svc.ID)
259+
})
260+
261+
Convey("Doesn't return a service running on other hosts", func() {
262+
state.AddServiceEntry(svc)
263+
264+
_, err := state.GetLocalServiceByID(svc.ID)
265+
So(err, ShouldNotBeNil)
266+
})
267+
268+
Convey("Returns an error for a non-existent service ID", func() {
269+
state.AddServiceEntry(svc)
270+
271+
_, err := state.GetLocalServiceByID("missing")
272+
So(err, ShouldNotBeNil)
273+
So(err.Error(), ShouldContainSubstring, "not found")
274+
})
222275
})
223276

224277
Convey("Merge() merges state we care about from other state structs", func() {
@@ -416,6 +469,36 @@ func Test_TrackingAndBroadcasting(t *testing.T) {
416469
So(state.Servers[hostname].LastChanged.After(lastChanged), ShouldBeTrue)
417470
})
418471

472+
Convey("Draining services have a lifespan and then are tombstoned", func() {
473+
lastChanged := state.Servers[hostname].LastChanged
474+
service1.Status = service.DRAINING
475+
state.AddServiceEntry(service1)
476+
svc := state.Servers[hostname].Services[service1.ID]
477+
stamp := service1.Updated.Add(0 - DRAINING_LIFESPAN - 5*time.Second)
478+
svc.Updated = stamp
479+
480+
state.TombstoneOthersServices()
481+
482+
So(svc.Status, ShouldEqual, service.TOMBSTONE)
483+
So(svc.Updated, ShouldBeTheSameTimeAs, stamp.Add(time.Second))
484+
So(state.Servers[hostname].LastChanged.After(lastChanged), ShouldBeTrue)
485+
})
486+
487+
Convey("Draining services are not tombstoned before their lifespan expires", func() {
488+
lastChanged := state.Servers[hostname].LastChanged
489+
service1.Status = service.DRAINING
490+
state.AddServiceEntry(service1)
491+
svc := state.Servers[hostname].Services[service1.ID]
492+
stamp := service1.Updated.Add(0 - ALIVE_LIFESPAN - 5*time.Second)
493+
svc.Updated = stamp
494+
495+
state.TombstoneOthersServices()
496+
497+
So(svc.Status, ShouldEqual, service.DRAINING)
498+
So(svc.Updated, ShouldBeTheSameTimeAs, stamp)
499+
So(state.Servers[hostname].LastChanged.After(lastChanged), ShouldBeTrue)
500+
})
501+
419502
Convey("Unhealthy/Unknown services have a lifespan and then are tombstoned", func() {
420503
unhealthyService := service.Service{ID: "unhealthy_shakespeare", Hostname: hostname, Updated: baseTime, Status: service.UNHEALTHY}
421504
state.AddServiceEntry(unhealthyService)

healthy/healthy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
"time"
1212

1313
"github.com/Nitro/sidecar/service"
14-
log "github.com/sirupsen/logrus"
1514
"github.com/relistan/go-director"
15+
log "github.com/sirupsen/logrus"
1616
)
1717

1818
const (

receiver/http_test.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func Test_updateHandler(t *testing.T) {
105105
So(received, ShouldBeTrue)
106106
})
107107

108-
Convey("enqueus all updates if no Subscriptions are provided", func() {
108+
Convey("enqueues all updates if no Subscriptions are provided", func() {
109109
evtState := deepcopy.Copy(state).(*catalog.ServicesState)
110110
evtState.LastChanged = time.Now().UTC()
111111

@@ -223,5 +223,33 @@ func Test_updateHandler(t *testing.T) {
223223
So(lastReceivedState.LastChanged.Before(state.LastChanged), ShouldBeTrue)
224224
So(len(lastReceivedState.Servers["chaucer"].Services), ShouldEqual, 2)
225225
})
226+
227+
Convey("enqueues an update to mark a service as DRAINING", func() {
228+
evtState := deepcopy.Copy(state).(*catalog.ServicesState)
229+
evtState.LastChanged = time.Now().UTC()
230+
231+
change := catalog.StateChangedEvent{
232+
State: evtState,
233+
ChangeEvent: catalog.ChangeEvent{
234+
Service: service.Service{
235+
Name: "nobody-wants-this",
236+
ID: "10101010101",
237+
Updated: time.Now().UTC(),
238+
Created: time.Now().UTC(),
239+
Status: service.DRAINING,
240+
},
241+
PreviousStatus: service.ALIVE,
242+
},
243+
}
244+
245+
encoded, _ := json.Marshal(change)
246+
req := httptest.NewRequest("POST", "/update", bytes.NewBuffer(encoded))
247+
248+
UpdateHandler(recorder, req, rcvr)
249+
resp := recorder.Result()
250+
251+
So(resp.StatusCode, ShouldEqual, 200)
252+
So(len(rcvr.ReloadChan), ShouldEqual, 1)
253+
})
226254
})
227255
}

receiver/receiver.go

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ func ShouldNotify(oldStatus int, newStatus int) bool {
5757
if oldStatus == service.ALIVE {
5858
return true
5959
}
60+
case service.DRAINING:
61+
return true
6062
default:
6163
log.Errorf("Got unknown service change status: %d", newStatus)
6264
return false

service/service.go

+7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919
TOMBSTONE = iota
2020
UNHEALTHY = iota
2121
UNKNOWN = iota
22+
DRAINING = iota
2223
)
2324

2425
type Port struct {
@@ -56,6 +57,10 @@ func (svc *Service) IsTombstone() bool {
5657
return svc.Status == TOMBSTONE
5758
}
5859

60+
func (svc *Service) IsDraining() bool {
61+
return svc.Status == DRAINING
62+
}
63+
5964
func (svc *Service) Invalidates(otherSvc *Service) bool {
6065
return otherSvc != nil && svc.Updated.After(otherSvc.Updated)
6166
}
@@ -162,6 +167,8 @@ func StatusString(status int) string {
162167
return "Unhealthy"
163168
case UNKNOWN:
164169
return "Unknown"
170+
case DRAINING:
171+
return "Draining"
165172
default:
166173
return "Tombstone"
167174
}

services_delegate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (d *servicesDelegate) Start() {
5050
log.Errorf("Start(): error decoding message: %s", err)
5151
continue
5252
}
53-
d.state.ServiceMsgs <- *entry
53+
d.state.UpdateService(*entry)
5454
}
5555
}()
5656

sidecarhttp/envoy_api_test.go

-11
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package sidecarhttp
22

33
import (
44
"encoding/json"
5-
"io/ioutil"
6-
"net/http"
75
"net/http/httptest"
86
"testing"
97
"time"
@@ -290,12 +288,3 @@ func Test_listenersHandler(t *testing.T) {
290288
})
291289
})
292290
}
293-
294-
// getResult fetchs the status code, headers, and body from a recorder
295-
func getResult(recorder *httptest.ResponseRecorder) (code int, headers *http.Header, body string) {
296-
resp := recorder.Result()
297-
bodyBytes, _ := ioutil.ReadAll(resp.Body)
298-
body = string(bodyBytes)
299-
300-
return resp.StatusCode, &resp.Header, body
301-
}

sidecarhttp/http_api.go

+53
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type SidecarApi struct {
3535
func (s *SidecarApi) HttpMux() http.Handler {
3636
router := mux.NewRouter()
3737
router.HandleFunc("/services/{name}.{extension}", wrap(s.oneServiceHandler)).Methods("GET")
38+
router.HandleFunc("/services/{id}/drain", wrap(s.drainServiceHandler)).Methods("POST")
3839
router.HandleFunc("/services.{extension}", wrap(s.servicesHandler)).Methods("GET")
3940
router.HandleFunc("/state.{extension}", wrap(s.stateHandler)).Methods("GET")
4041
router.HandleFunc("/watch", wrap(s.watchHandler)).Methods("GET")
@@ -289,6 +290,58 @@ func (s *SidecarApi) stateHandler(response http.ResponseWriter, req *http.Reques
289290
}
290291
}
291292

293+
// drainServiceHandler instructs Sidecar to set the status of a given service
294+
// instance to DRAINING. This allows us to decomission the given service
295+
// instance and let it sit around for a short amount of time, so it can finish
296+
// processing the requests that are still in flight.
297+
func (s *SidecarApi) drainServiceHandler(response http.ResponseWriter, req *http.Request, params map[string]string) {
298+
defer req.Body.Close()
299+
300+
if req.Method != http.MethodPost {
301+
sendJsonError(response, 400, fmt.Sprintf("Bad request - Method %q not allowed", req.Method))
302+
return
303+
}
304+
305+
serviceID, ok := params["id"]
306+
if !ok {
307+
sendJsonError(response, 404, "Not Found - No service ID provided")
308+
return
309+
}
310+
311+
if s.state == nil {
312+
sendJsonError(response, 500, "Internal Server Error - Something went terribly wrong")
313+
return
314+
}
315+
316+
svc, err := s.state.GetLocalServiceByID(serviceID)
317+
if err != nil {
318+
sendJsonError(response, 404, fmt.Sprintf("Not Found - Service ID %q not found", serviceID))
319+
return
320+
}
321+
322+
svc.Updated = time.Now()
323+
svc.Status = service.DRAINING
324+
s.state.UpdateService(svc)
325+
326+
result := struct {
327+
Message string
328+
}{
329+
Message: fmt.Sprintf("Service %q instance %q set to DRAINING", svc.Name, svc.ID),
330+
}
331+
jsonBytes, err := json.MarshalIndent(&result, "", " ")
332+
if err != nil {
333+
sendJsonError(response, 500, "Internal Server Error - Something went terribly wrong")
334+
return
335+
}
336+
337+
response.Header().Set("Content-Type", "application/json")
338+
response.WriteHeader(202)
339+
_, err = response.Write(jsonBytes)
340+
if err != nil {
341+
log.Errorf("Error writing drain service response to client: %s", err)
342+
}
343+
}
344+
292345
// Send back a JSON encoded error and message
293346
func sendJsonError(response http.ResponseWriter, status int, message string) {
294347
output := map[string]string{

0 commit comments

Comments
 (0)