Skip to content

Commit 1e7adee

Browse files
Merge pull request #124172 from liggitt/automated-cherry-pick-of-#123598-upstream-release-1.29
Automated cherry pick of #123598: Remotecommand test flake cleanup Kubernetes-commit: 15d121d04b51d12f304340e785b1659c25223afe
2 parents 0058eee + cc21122 commit 1e7adee

File tree

4 files changed

+37
-12
lines changed

4 files changed

+37
-12
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ require (
2525
golang.org/x/time v0.3.0
2626
google.golang.org/protobuf v1.33.0
2727
k8s.io/api v0.0.0-20240404161350-448db12cecfb
28-
k8s.io/apimachinery v0.0.0-20240404161013-3e7c65a7bc4d
28+
k8s.io/apimachinery v0.0.0-20240405121012-2bbf53022625
2929
k8s.io/klog/v2 v2.110.1
3030
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
3131
k8s.io/utils v0.0.0-20230726121419-3b25d923346b

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
155155
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
156156
k8s.io/api v0.0.0-20240404161350-448db12cecfb h1:/uYBtjPF21kf1qLyVVXGQOcFcjjqGowAssqopfbjQB0=
157157
k8s.io/api v0.0.0-20240404161350-448db12cecfb/go.mod h1:dPhF9d7Kl/mI8T1SzZBp2/saFzES5J24O1PGAwNajB0=
158-
k8s.io/apimachinery v0.0.0-20240404161013-3e7c65a7bc4d h1:bth65kuQyrmaqeI1unRAE9O4KtHWeblZ8mIZEjpOuc4=
159-
k8s.io/apimachinery v0.0.0-20240404161013-3e7c65a7bc4d/go.mod h1:i3FJVwhvSp/6n8Fl4K97PJEP8C+MM+aoDq4+ZJBf70Y=
158+
k8s.io/apimachinery v0.0.0-20240405121012-2bbf53022625 h1:itYkKr52W3ugNsYoBZwpwcezu/vPxg3GK7XVkX24GAo=
159+
k8s.io/apimachinery v0.0.0-20240405121012-2bbf53022625/go.mod h1:i3FJVwhvSp/6n8Fl4K97PJEP8C+MM+aoDq4+ZJBf70Y=
160160
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
161161
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
162162
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=

tools/remotecommand/websocket.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ type wsStreamCreator struct {
187187
// map of stream id to stream; multiple streams read/write the connection
188188
streams map[byte]*stream
189189
streamsMu sync.Mutex
190+
// setStreamErr holds the error to return to anyone calling setStreams.
191+
// this is populated in closeAllStreamReaders
192+
setStreamErr error
190193
}
191194

192195
func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
@@ -202,10 +205,14 @@ func (c *wsStreamCreator) getStream(id byte) *stream {
202205
return c.streams[id]
203206
}
204207

205-
func (c *wsStreamCreator) setStream(id byte, s *stream) {
208+
func (c *wsStreamCreator) setStream(id byte, s *stream) error {
206209
c.streamsMu.Lock()
207210
defer c.streamsMu.Unlock()
211+
if c.setStreamErr != nil {
212+
return c.setStreamErr
213+
}
208214
c.streams[id] = s
215+
return nil
209216
}
210217

211218
// CreateStream uses id from passed headers to create a stream over "c.conn" connection.
@@ -228,7 +235,11 @@ func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream,
228235
connWriteLock: &c.connWriteLock,
229236
id: id,
230237
}
231-
c.setStream(id, s)
238+
if err := c.setStream(id, s); err != nil {
239+
_ = s.writePipe.Close()
240+
_ = s.readPipe.Close()
241+
return nil, err
242+
}
232243
return s, nil
233244
}
234245

@@ -312,14 +323,20 @@ func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, de
312323
}
313324

314325
// closeAllStreamReaders closes readers in all streams.
315-
// This unblocks all stream.Read() calls.
326+
// This unblocks all stream.Read() calls, and keeps any future streams from being created.
316327
func (c *wsStreamCreator) closeAllStreamReaders(err error) {
317328
c.streamsMu.Lock()
318329
defer c.streamsMu.Unlock()
319330
for _, s := range c.streams {
320331
// Closing writePipe unblocks all readPipe.Read() callers and prevents any future writes.
321332
_ = s.writePipe.CloseWithError(err)
322333
}
334+
// ensure callers to setStreams receive an error after this point
335+
if err != nil {
336+
c.setStreamErr = err
337+
} else {
338+
c.setStreamErr = fmt.Errorf("closed all streams")
339+
}
323340
}
324341

325342
type stream struct {

tools/remotecommand/websocket_test.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -817,15 +817,16 @@ func TestWebSocketClient_BadHandshake(t *testing.T) {
817817
// TestWebSocketClient_HeartbeatTimeout tests the heartbeat by forcing a
818818
// timeout by setting the ping period greater than the deadline.
819819
func TestWebSocketClient_HeartbeatTimeout(t *testing.T) {
820+
blockRequestCtx, unblockRequest := context.WithCancel(context.Background())
821+
defer unblockRequest()
820822
// Create fake WebSocket server which blocks.
821823
websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
822824
conns, err := webSocketServerStreams(req, w, streamOptionsFromRequest(req))
823825
if err != nil {
824826
t.Fatalf("error on webSocketServerStreams: %v", err)
825827
}
826828
defer conns.conn.Close()
827-
// Block server; heartbeat timeout (or test timeout) will fire before this returns.
828-
time.Sleep(1 * time.Second)
829+
<-blockRequestCtx.Done()
829830
}))
830831
defer websocketServer.Close()
831832
// Create websocket client connecting to fake server.
@@ -840,8 +841,8 @@ func TestWebSocketClient_HeartbeatTimeout(t *testing.T) {
840841
}
841842
streamExec := exec.(*wsStreamExecutor)
842843
// Ping period is greater than the ping deadline, forcing the timeout to fire.
843-
pingPeriod := 20 * time.Millisecond
844-
pingDeadline := 5 * time.Millisecond
844+
pingPeriod := wait.ForeverTestTimeout // this lets the heartbeat deadline expire without renewing it
845+
pingDeadline := time.Second // this gives setup 1 second to establish streams
845846
streamExec.heartbeatPeriod = pingPeriod
846847
streamExec.heartbeatDeadline = pingDeadline
847848
// Send some random data to the websocket server through STDIN.
@@ -859,8 +860,7 @@ func TestWebSocketClient_HeartbeatTimeout(t *testing.T) {
859860
}()
860861

861862
select {
862-
case <-time.After(pingPeriod * 5):
863-
// Give up after about five ping attempts
863+
case <-time.After(wait.ForeverTestTimeout):
864864
t.Fatalf("expected heartbeat timeout, got none.")
865865
case err := <-errorChan:
866866
// Expecting heartbeat timeout error.
@@ -1116,6 +1116,14 @@ func TestWebSocketClient_HeartbeatSucceeds(t *testing.T) {
11161116
wg.Wait()
11171117
}
11181118

1119+
func TestLateStreamCreation(t *testing.T) {
1120+
c := newWSStreamCreator(nil)
1121+
c.closeAllStreamReaders(nil)
1122+
if err := c.setStream(0, nil); err == nil {
1123+
t.Fatal("expected error adding stream after closeAllStreamReaders")
1124+
}
1125+
}
1126+
11191127
func TestWebSocketClient_StreamsAndExpectedErrors(t *testing.T) {
11201128
// Validate Stream functions.
11211129
c := newWSStreamCreator(nil)

0 commit comments

Comments
 (0)