Skip to content

Commit fd16dc0

Browse files
authored
Merge pull request #3 from donbing007/feature/fix-websocket-healthcheck
Feature/fix websocket healthcheck
2 parents feb4097 + 6b02690 commit fd16dc0

File tree

8 files changed

+1061
-86
lines changed

8 files changed

+1061
-86
lines changed

Gopkg.lock

Lines changed: 337 additions & 41 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/agent.go

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"github.com/choerodon/choerodon-cluster-agent/manager"
77
"github.com/choerodon/choerodon-cluster-agent/pkg/cluster"
88
"github.com/choerodon/choerodon-cluster-agent/pkg/cluster/kubernetes"
9-
k8sclient "k8s.io/client-go/kubernetes"
10-
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
119
"github.com/choerodon/choerodon-cluster-agent/pkg/git"
1210
"github.com/choerodon/choerodon-cluster-agent/pkg/helm"
1311
"github.com/choerodon/choerodon-cluster-agent/pkg/kube"
@@ -17,6 +15,8 @@ import (
1715
"github.com/choerodon/choerodon-cluster-agent/ws"
1816
"github.com/golang/glog"
1917
"github.com/spf13/pflag"
18+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
k8sclient "k8s.io/client-go/kubernetes"
2020
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
2121
"net/http"
2222
"os"
@@ -36,12 +36,19 @@ const (
3636
)
3737

3838
type AgentOptions struct {
39-
Listen string
40-
UpstreamURL string
41-
Token string
39+
Listen string
40+
UpstreamURL string
41+
Token string
42+
ReadLimit int64
43+
ConnectionTimeout time.Duration
44+
WriteTimeout time.Duration
45+
HealthCheckDuration time.Duration
46+
HealthCheckTimeout time.Duration
47+
HealthCheckTryNumber int32
48+
4249
PrintVersion bool
4350
// kubernetes controller
44-
PlatformCode string
51+
PlatformCode string
4552
ConcurrentEndpointSyncs int32
4653
ConcurrentServiceSyncs int32
4754
ConcurrentRSSyncs int32
@@ -127,8 +134,15 @@ func Run(o *AgentOptions, f cmdutil.Factory) {
127134

128135
checkKube(kubeClient.GetKubeClient())
129136

137+
appClient, err := ws.NewClient(ws.Token(o.Token), o.UpstreamURL, chans, &ws.Conf{
138+
ReadLimit: o.ReadLimit,
139+
ConnectionTimeout: o.ConnectionTimeout,
140+
WriteTimeout: o.WriteTimeout,
141+
HealthCheckTryNumber: o.HealthCheckTryNumber,
142+
HealthCheckTimeout: o.HealthCheckTimeout,
143+
HealthCheckDuration: o.HealthCheckDuration,
144+
})
130145

131-
appClient, err := ws.NewClient(ws.Token(o.Token), o.UpstreamURL, chans)
132146
if err != nil {
133147
errChan <- err
134148
return
@@ -137,14 +151,14 @@ func Run(o *AgentOptions, f cmdutil.Factory) {
137151

138152
//gitRemote := git.Remote{URL: o.gitURL}
139153
gitConfig := git.Config{
140-
Branch: o.gitBranch,
141-
Path: o.gitPath,
142-
UserName: o.gitUser,
143-
GitUrl: o.gitURL,
144-
UserEmail: o.gitEmail,
145-
SyncTag: o.gitSyncTag,
146-
DevOpsTag: o.gitDevOpsSyncTag,
147-
NotesRef: o.gitNotesRef,
154+
Branch: o.gitBranch,
155+
Path: o.gitPath,
156+
UserName: o.gitUser,
157+
GitUrl: o.gitURL,
158+
UserEmail: o.gitEmail,
159+
SyncTag: o.gitSyncTag,
160+
DevOpsTag: o.gitDevOpsSyncTag,
161+
NotesRef: o.gitNotesRef,
148162
GitPollInterval: o.gitPollInterval,
149163
}
150164
//gitRepo := git.NewRepo(gitRemote, git.PollInterval(o.gitPollInterval))
@@ -178,7 +192,6 @@ func Run(o *AgentOptions, f cmdutil.Factory) {
178192
// k8sManifests = &kubernetes.Manifests{Namespace: o.Namespace}
179193
//}
180194

181-
182195
namespaces := manager.NewNamespaces()
183196

184197
ctx := controller.CreateControllerContext(
@@ -204,11 +217,11 @@ func Run(o *AgentOptions, f cmdutil.Factory) {
204217
glog.Fatal(err)
205218
}
206219
glog.Infof("kubectl %s", kubectl)
207-
cfg,_ := f.ClientConfig()
220+
cfg, _ := f.ClientConfig()
208221
kubectlApplier := kubernetes.NewKubectl(kubectl, cfg)
209222
kubectlApplier.ApplySingleObj("kube-system", model.CRD_YAML)
210223

211-
k8s = kubernetes.NewCluster( kubeClient.GetKubeClient(), kubeClient.GetC7NClient(), kubectlApplier)
224+
k8s = kubernetes.NewCluster(kubeClient.GetKubeClient(), kubeClient.GetC7NClient(), kubectlApplier)
212225
k8sManifests = &kubernetes.Manifests{}
213226
}
214227
workerManager := worker.NewWorkerManager(
@@ -233,15 +246,12 @@ func Run(o *AgentOptions, f cmdutil.Factory) {
233246
go workerManager.Start()
234247
shutdownWg.Add(1)
235248

236-
237-
238249
go func() {
239250
errChan <- http.ListenAndServe(o.Listen, nil)
240251
}()
241252

242253
}
243254

244-
245255
func (o *AgentOptions) BindFlags(fs *pflag.FlagSet) {
246256
fs.BoolVar(&o.PrintVersion, "version", false, "print the version number")
247257
fs.StringVar(&o.Listen, "listen", o.Listen, "address:port to listen on")
@@ -250,6 +260,12 @@ func (o *AgentOptions) BindFlags(fs *pflag.FlagSet) {
250260
fs.StringVar(&o.UpstreamURL, "connect", "", "Connect to an upstream service")
251261
fs.StringVar(&o.Token, "token", "", "Authentication token for upstream service")
252262
fs.Int32Var(&o.ClusterId, "clusterId", 0, "the env cluster id in devops")
263+
fs.Int64Var(&o.ReadLimit, "readLimit", 524288, "message read size")
264+
fs.DurationVar(&o.ConnectionTimeout, "connectionTimeout", 10*time.Second, "connection server timeout, default 10 second.")
265+
fs.DurationVar(&o.WriteTimeout, "writeTimeout", 3*time.Second, "write message timeout, default 3 second.")
266+
fs.DurationVar(&o.HealthCheckDuration, "healthCheckDuration", 3*time.Second, "health check duration.")
267+
fs.DurationVar(&o.HealthCheckTimeout, "healthCheckTimeout", 10*time.Second, "health check wait pong duration.")
268+
fs.Int32Var(&o.HealthCheckTryNumber, "healthCheckTryNumber", int32(3), "health check max try number.")
253269

254270
// kubernetes controller
255271
fs.StringVar(&o.PlatformCode, "choerodon-id", "", "choerodon platform id label")
@@ -279,13 +295,12 @@ func (o *AgentOptions) BindFlags(fs *pflag.FlagSet) {
279295
fs.StringVar(&o.kubernetesKubectl, "kubernetes-kubectl", "", "Optional, explicit path to kubectl tool")
280296
}
281297

282-
func checkKube(client *k8sclient.Clientset) {
298+
func checkKube(client *k8sclient.Clientset) {
283299
glog.Infof("check k8s role binding...")
284300
_, err := client.CoreV1().Pods("").List(meta_v1.ListOptions{})
285301
if err != nil {
286302
glog.Errorf("check role binding failed %v", err)
287-
os.Exit(0 )
303+
os.Exit(0)
288304
}
289305
glog.Infof(" k8s role binding succeed.")
290306
}
291-

ws/client.go

Lines changed: 76 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
initialBackOff = 1 * time.Second
2525
maxBackOff = 60 * time.Second
2626
)
27+
2728
var connectFlag = false
2829

2930
type WebSocketClient interface {
@@ -43,12 +44,23 @@ type appClient struct {
4344
backgroundWait sync.WaitGroup
4445
pipeConns map[string]*websocket.Conn
4546
respQueue []*model.Packet
47+
conf *Conf
48+
healthCheck *HealthCheck
49+
}
50+
51+
type Conf struct {
52+
ReadLimit int64
53+
ConnectionTimeout time.Duration
54+
WriteTimeout time.Duration
55+
HealthCheckDuration time.Duration
56+
HealthCheckTimeout time.Duration
57+
HealthCheckTryNumber int32
4658
}
4759

4860
func NewClient(
4961
t Token,
5062
endpoint string,
51-
crChannel *manager.CRChan) (WebSocketClient, error) {
63+
crChannel *manager.CRChan, connConf *Conf) (WebSocketClient, error) {
5264
if endpoint == "" {
5365
return nil, fmt.Errorf("no upstream URL given")
5466
}
@@ -61,13 +73,14 @@ func NewClient(
6173
httpClient := cleanhttp.DefaultClient()
6274

6375
c := &appClient{
64-
url: endpointURL,
65-
token: t,
76+
url: endpointURL,
77+
token: t,
6678
crChannel: crChannel,
67-
quit: make(chan struct{}),
68-
client: httpClient,
69-
pipeConns: make(map[string]*websocket.Conn),
70-
respQueue: make([]*model.Packet, 0, 100),
79+
quit: make(chan struct{}),
80+
client: httpClient,
81+
pipeConns: make(map[string]*websocket.Conn),
82+
respQueue: make([]*model.Packet, 0, 100),
83+
conf: connConf,
7184
}
7285

7386
return c, nil
@@ -102,12 +115,18 @@ func (c *appClient) Loop(stop <-chan struct{}, done *sync.WaitGroup) {
102115
func (c *appClient) connect() error {
103116
glog.V(1).Info("Start connect to DevOps service")
104117
var err error
105-
c.conn, err = dial(c.url.String(), c.token)
118+
c.conn, err = dial(c.url.String(), c.token, c.conf.ConnectionTimeout)
106119
if err != nil {
107120
return err
108121
}
109122
glog.V(1).Info("Connect to DevOps service success")
110123

124+
// 配置连接.
125+
c.configure()
126+
127+
// 开始健康检查
128+
c.startHealthCheck()
129+
111130
// 建立连接,同步资源对象
112131
if connectFlag {
113132
c.crChannel.CommandChan <- newReConnectCommand()
@@ -124,7 +143,6 @@ func (c *appClient) connect() error {
124143
go func() {
125144
defer close(done)
126145

127-
c.conn.SetPingHandler(nil)
128146
for {
129147
var command model.Packet
130148
err := c.conn.ReadJSON(&command)
@@ -134,14 +152,17 @@ func (c *appClient) connect() error {
134152
}
135153
break
136154
}
155+
156+
c.healthCheck.OnRecvice(nil)
157+
137158
glog.V(1).Info("receive command: ", command)
138159
c.crChannel.CommandChan <- &command
139160
}
140161
}()
141162

142163
end := 0
143164
for ; end < len(c.respQueue); end++ {
144-
c.conn.SetWriteDeadline(time.Now().Add(WriteWait))
165+
c.conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
145166
resp := c.respQueue[end]
146167
if err := c.sendResponse(resp); err != nil {
147168
c.respQueue = c.respQueue[end:]
@@ -292,7 +313,7 @@ func (c *appClient) pipeConnection(id string, pipe common.Pipe) (bool, error) {
292313
}
293314
newURLStr := fmt.Sprintf("%s.%s:%s", newURL.String(), pipe.PipeType(), id)
294315
headers := http.Header{}
295-
conn, resp, err := dialWS(newURLStr, headers)
316+
conn, resp, err := dialWS(newURLStr, headers, c.conf.ConnectionTimeout)
296317
if resp != nil && resp.StatusCode == http.StatusNotFound {
297318
glog.V(2).Info("response with not found")
298319
pipe.Close()
@@ -317,16 +338,55 @@ func (c *appClient) pipeConnection(id string, pipe common.Pipe) (bool, error) {
317338
return true, nil
318339
}
319340

320-
func newReConnectCommand() *model.Packet{
341+
func (c *appClient) configure() {
342+
c.conn.SetReadLimit(c.conf.ReadLimit)
343+
344+
c.conn.SetPingHandler(func(message string) error {
345+
return c.healthCheck.OnPing([]byte(message))
346+
})
347+
348+
c.conn.SetPongHandler(func(message string) error {
349+
return c.healthCheck.OnPone([]byte(message))
350+
})
351+
352+
c.conn.SetCloseHandler(func(code int, text string) error {
353+
c.healthCheck.Stop()
354+
355+
return nil
356+
})
357+
}
358+
359+
func (c *appClient) startHealthCheck() error {
360+
361+
if c.healthCheck != nil {
362+
c.healthCheck = nil
363+
}
364+
365+
h, err := NewHealthCheck(c.conf, c.conn, func() {
366+
e := c.conn.Close()
367+
if e != nil {
368+
glog.V(1).Infof("An error occurred while closing target %s because of %v.", c.conn.RemoteAddr().String(), e)
369+
}
370+
})
371+
372+
if err != nil {
373+
return err
374+
} else {
375+
c.healthCheck = h
376+
return nil
377+
}
378+
}
379+
380+
func newReConnectCommand() *model.Packet {
321381
return &model.Packet{
322-
Key: "inter:inter",
382+
Key: "inter:inter",
323383
Type: model.ReSyncAgent,
324384
}
325385
}
326-
func newUpgradeInfoCommand(connectUrl string) *model.Packet{
386+
func newUpgradeInfoCommand(connectUrl string) *model.Packet {
327387
return &model.Packet{
328-
Key: "inter:inter",
329-
Type: model.UpgradeCluster,
388+
Key: "inter:inter",
389+
Type: model.UpgradeCluster,
330390
Payload: connectUrl,
331391
}
332392
}

ws/client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ws
22

3+
34
import (
45
"encoding/json"
56
"fmt"

0 commit comments

Comments
 (0)