Skip to content

Commit 8dde295

Browse files
Chao Xuk8s-publishing-bot
Chao Xu
authored andcommitted
Add a unit test testing the HTTP/2 health check help the REST client
detects broken TCP connections. Kubernetes-commit: ba7b1f7a89ffed78115ab0229b7504d05f6c7d48
1 parent 7c9ea22 commit 8dde295

File tree

1 file changed

+164
-0
lines changed

1 file changed

+164
-0
lines changed

rest/connection_test.go

+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package rest
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"net"
24+
"net/http"
25+
"net/http/httptest"
26+
"net/url"
27+
"os"
28+
"strconv"
29+
"sync/atomic"
30+
"testing"
31+
"time"
32+
33+
"k8s.io/apimachinery/pkg/runtime/schema"
34+
"k8s.io/apimachinery/pkg/runtime/serializer"
35+
utilnet "k8s.io/apimachinery/pkg/util/net"
36+
)
37+
38+
type tcpLB struct {
39+
t *testing.T
40+
ln net.Listener
41+
serverURL string
42+
dials int32
43+
}
44+
45+
func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) {
46+
out, err := net.Dial("tcp", lb.serverURL)
47+
if err != nil {
48+
lb.t.Log(err)
49+
return
50+
}
51+
go io.Copy(out, in)
52+
go io.Copy(in, out)
53+
<-stopCh
54+
if err := out.Close(); err != nil {
55+
lb.t.Fatalf("failed to close connection: %v", err)
56+
}
57+
}
58+
59+
func (lb *tcpLB) serve(stopCh chan struct{}) {
60+
conn, err := lb.ln.Accept()
61+
if err != nil {
62+
lb.t.Fatalf("failed to accept: %v", err)
63+
}
64+
atomic.AddInt32(&lb.dials, 1)
65+
go lb.handleConnection(conn, stopCh)
66+
}
67+
68+
func newLB(t *testing.T, serverURL string) *tcpLB {
69+
ln, err := net.Listen("tcp", "127.0.0.1:0")
70+
if err != nil {
71+
t.Fatalf("failed to bind: %v", err)
72+
}
73+
lb := tcpLB{
74+
serverURL: serverURL,
75+
ln: ln,
76+
t: t,
77+
}
78+
return &lb
79+
}
80+
81+
func setEnv(key, value string) func() {
82+
originalValue := os.Getenv(key)
83+
os.Setenv(key, value)
84+
return func() {
85+
os.Setenv(key, originalValue)
86+
}
87+
}
88+
89+
const (
90+
readIdleTimeout int = 1
91+
pingTimeout int = 1
92+
)
93+
94+
func TestReconnectBrokenTCP(t *testing.T) {
95+
defer setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", strconv.Itoa(readIdleTimeout))()
96+
defer setEnv("HTTP2_PING_TIMEOUT_SECONDS", strconv.Itoa(pingTimeout))()
97+
defer setEnv("DISABLE_HTTP2", "")()
98+
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
99+
fmt.Fprintf(w, "Hello, %s", r.Proto)
100+
}))
101+
ts.EnableHTTP2 = true
102+
ts.StartTLS()
103+
defer ts.Close()
104+
105+
u, err := url.Parse(ts.URL)
106+
if err != nil {
107+
t.Fatalf("failed to parse URL from %q: %v", ts.URL, err)
108+
}
109+
lb := newLB(t, u.Host)
110+
defer lb.ln.Close()
111+
stopCh := make(chan struct{})
112+
go lb.serve(stopCh)
113+
transport, ok := ts.Client().Transport.(*http.Transport)
114+
if !ok {
115+
t.Fatalf("failed to assert *http.Transport")
116+
}
117+
config := &Config{
118+
Host: "https://" + lb.ln.Addr().String(),
119+
Transport: utilnet.SetTransportDefaults(transport),
120+
Timeout: 1 * time.Second,
121+
// These fields are required to create a REST client.
122+
ContentConfig: ContentConfig{
123+
GroupVersion: &schema.GroupVersion{},
124+
NegotiatedSerializer: &serializer.CodecFactory{},
125+
},
126+
}
127+
client, err := RESTClientFor(config)
128+
if err != nil {
129+
t.Fatalf("failed to create REST client: %v", err)
130+
}
131+
data, err := client.Get().AbsPath("/").DoRaw(context.TODO())
132+
if err != nil {
133+
t.Fatalf("unexpected err: %s: %v", data, err)
134+
}
135+
if string(data) != "Hello, HTTP/2.0" {
136+
t.Fatalf("unexpected response: %s", data)
137+
}
138+
139+
// Deliberately let the LB stop proxying traffic for the current
140+
// connection. This mimics a broken TCP connection that's not properly
141+
// closed.
142+
close(stopCh)
143+
144+
stopCh = make(chan struct{})
145+
go lb.serve(stopCh)
146+
// Sleep enough time for the HTTP/2 health check to detect and close
147+
// the broken TCP connection.
148+
time.Sleep(time.Duration(1+readIdleTimeout+pingTimeout) * time.Second)
149+
// If the HTTP/2 health check were disabled, the broken connection
150+
// would still be in the connection pool, the following request would
151+
// then reuse the broken connection instead of creating a new one, and
152+
// thus would fail.
153+
data, err = client.Get().AbsPath("/").DoRaw(context.TODO())
154+
if err != nil {
155+
t.Fatalf("unexpected err: %v", err)
156+
}
157+
if string(data) != "Hello, HTTP/2.0" {
158+
t.Fatalf("unexpected response: %s", data)
159+
}
160+
dials := atomic.LoadInt32(&lb.dials)
161+
if dials != 2 {
162+
t.Fatalf("expected %d dials, got %d", 2, dials)
163+
}
164+
}

0 commit comments

Comments
 (0)