-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsidecar_watcher.go
117 lines (97 loc) · 3.14 KB
/
sidecar_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"net/http"
"time"
"github.com/Nitro/sidecar/catalog"
"github.com/Nitro/sidecar/service"
"github.com/relistan/go-director"
log "github.com/sirupsen/logrus"
)
// A SidecarWatcher attaches to the /watch endpoint on a Sidecar instance and
// sends notifications when something has changed in the remote state.
//
// haproxy-api uses this when in Follow mode.
const (
CONNECTION_REFRESH_TIME = 3 * time.Minute
)
// Heavily modified from @bparli's Traefik provider for Sidecar:
// https://github.com/Nitro/traefik/blob/master/provider/sidecar.go
type SidecarWatcher struct {
RefreshConn time.Duration // How often to refresh the connection to Sidecar backend
Client *http.Client
transport *http.Transport
notifyChan chan struct{}
looper director.Looper
timer *time.Timer
url string
}
// Return a new, fully configured SidecarWatcher
func NewSidecarWatcher(url string, looper director.Looper, notifyChan chan struct{}) *SidecarWatcher {
tr := &http.Transport{ResponseHeaderTimeout: 0}
w := &SidecarWatcher{
RefreshConn: CONNECTION_REFRESH_TIME,
Client: &http.Client{Timeout: 0, Transport: tr},
looper: looper,
transport: tr,
notifyChan: notifyChan,
timer: time.NewTimer(CONNECTION_REFRESH_TIME),
url: url,
}
log.Infof("Using Sidecar connection refresh interval: %s", w.RefreshConn.String())
return w
}
// onChange is a callback triggered by changed from the Sidecar /watch
// endpoint. We don't need the data it sends, but the callback function
// is required to be of this signature.
func (w *SidecarWatcher) onChange(state map[string][]*service.Service, err error) {
// If something went wrong, we bail, don't reload HAproxy,
// and let the connection time out
if err != nil {
log.Errorf("Got error from stream parser: %s", err.Error())
return
}
w.notify()
// Stop and reset the timer
if !w.timer.Stop() {
<-w.timer.C
}
w.resetTimer()
}
// Utility method to rest the timer
func (w *SidecarWatcher) resetTimer() {
w.timer.Reset(w.RefreshConn)
}
// Utility method to send the right data on the notifyChan
func (w *SidecarWatcher) notify() {
w.notifyChan <- struct{}{}
}
// Follow() will attach to the /watch endpoint on a Sidecar instance and
// send notifications on the notifyChan when something has changed on the
// remote host. It uses a timer to guarantee that we get a refresh on the
// open connection every RefreshConn so that we don't end up being
// orphaned.
func (w *SidecarWatcher) Follow() {
var err error
var resp *http.Response
var req *http.Request
w.looper.Loop(func() error {
req, err = http.NewRequest("GET", w.url, nil)
if err != nil {
log.Errorf("Error creating http request to Sidecar: %s, Error: %s", w.url, err)
return nil
}
resp, err = w.Client.Do(req)
if err != nil {
log.Errorf("Error connecting to Sidecar: %s, Error: %s", w.url, err)
time.Sleep(5 * time.Second)
return nil
}
// DecodeStream will trigger the onChange callback on each event
go catalog.DecodeStream(resp.Body, w.onChange)
// Block waiting on the timer to expire
<-w.timer.C
w.resetTimer()
w.transport.CancelRequest(req)
return nil
})
}