Skip to content

Commit ce46b76

Browse files
authored
Merge pull request #27 from Nitro/relistan/fix-watch-endpoint
Convert /watch to Listener & don't leak goroutines
2 parents 1580d73 + aee2dfd commit ce46b76

File tree

3 files changed

+73
-18
lines changed

3 files changed

+73
-18
lines changed

catalog/services_state.go

+17
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,23 @@ func (state *ServicesState) AddListener(listener Listener) {
230230
log.Debugf("AddListener(): added %s, new count %d", listener.Name(), len(state.listeners))
231231
}
232232

233+
// Remove an event listener channel by name. This will find the first
234+
// listener in the list with the specified name and will remove it.
235+
func (state *ServicesState) RemoveListener(name string) error {
236+
state.Lock()
237+
defer state.Unlock()
238+
239+
for i := 0; i < len(state.listeners); i++ {
240+
if state.listeners[i].Name() == name {
241+
state.listeners = append(state.listeners[:i], state.listeners[i+1:]...)
242+
log.Debugf("RemoveListener(): removed %s, new count %d", name, len(state.listeners))
243+
return nil
244+
}
245+
}
246+
247+
return fmt.Errorf("No listener found with the name: %s", name)
248+
}
249+
233250
// Take a service and merge it into our state. Correctly handle
234251
// timestamps so we only add things newer than what we already
235252
// know about. Retransmits updates to cluster peers.

catalog/services_state_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,20 @@ func Test_Listeners(t *testing.T) {
461461
So(len(state.listeners), ShouldEqual, 1)
462462
})
463463

464+
Convey("Removing listeners results in them being removed from the list", func() {
465+
state.AddListener(listener)
466+
So(len(state.listeners), ShouldEqual, 1)
467+
468+
err := state.RemoveListener("listener1")
469+
So(len(state.listeners), ShouldEqual, 0)
470+
So(err, ShouldBeNil)
471+
})
472+
473+
Convey("Removing a listener that doesn't exist returns an error", func() {
474+
err := state.RemoveListener("foo")
475+
So(err, ShouldNotBeNil)
476+
})
477+
464478
Convey("A major state change event notifies all listeners", func() {
465479
var result ChangeEvent
466480
var result2 ChangeEvent

http.go

+42-18
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,31 @@ type ApiServices struct {
3030
ClusterName string
3131
}
3232

33+
// A ServicesState.Listener that we use for the /watch endpoint
34+
type HttpListener struct {
35+
eventChan chan catalog.ChangeEvent
36+
name string
37+
}
38+
39+
func NewHttpListener() *HttpListener {
40+
return &HttpListener{
41+
// This should be fine enough granularity for practical purposes
42+
name: fmt.Sprintf("httpListener-%d", time.Now().UTC().UnixNano()),
43+
// Listeners must have buffered channels. We'll use a
44+
// somewhat larger buffer here because of the slow link
45+
// problem with http
46+
eventChan: make(chan catalog.ChangeEvent, 50),
47+
}
48+
}
49+
50+
func (h *HttpListener) Chan() chan catalog.ChangeEvent {
51+
return h.eventChan
52+
}
53+
54+
func (h *HttpListener) Name() string {
55+
return h.name
56+
}
57+
3358
func makeHandler(fn func(http.ResponseWriter, *http.Request,
3459
*memberlist.Memberlist, *catalog.ServicesState),
3560
list *memberlist.Memberlist, state *catalog.ServicesState) http.HandlerFunc {
@@ -44,29 +69,29 @@ func watchHandler(response http.ResponseWriter, req *http.Request, list *memberl
4469

4570
response.Header().Set("Content-Type", "application/json")
4671

47-
lastChange := time.Unix(0, 0)
4872
var jsonBytes []byte
4973
var err error
5074

51-
for {
52-
var changed bool
75+
listener := NewHttpListener()
5376

54-
func() { // Wrap critical section
55-
state.RLock()
56-
defer state.RUnlock()
77+
// Find out when the http connection closed so we can stop
78+
notify := response.(http.CloseNotifier).CloseNotify()
5779

58-
if state.LastChanged.After(lastChange) {
59-
lastChange = state.LastChanged
60-
jsonBytes, err = json.Marshal(state.ByService())
61-
if err != nil {
62-
log.Errorf("Error marshaling state in watchHandler: %s", err.Error())
63-
return
64-
}
65-
changed = true // Trigger sending new encoding
66-
}
67-
}()
80+
// Let's subscribe to state change events
81+
state.AddListener(listener)
82+
defer state.RemoveListener(listener.Name())
6883

69-
if changed {
84+
for {
85+
select {
86+
case <-notify:
87+
break
88+
89+
case <-listener.Chan():
90+
jsonBytes, err = json.Marshal(state.ByService())
91+
if err != nil {
92+
log.Errorf("Error marshaling state in watchHandler: %s", err.Error())
93+
return
94+
}
7095
// In order to flush immediately, we have to cast to a Flusher.
7196
// The normal HTTP library supports this but not all do, so we
7297
// check just in case.
@@ -75,7 +100,6 @@ func watchHandler(response http.ResponseWriter, req *http.Request, list *memberl
75100
f.Flush()
76101
}
77102
}
78-
time.Sleep(250 * time.Millisecond)
79103
}
80104
}
81105

0 commit comments

Comments
 (0)