Skip to content

Commit cbe3261

Browse files
committed
ISSUE-922: chore(tests/containers): implement retry if port-forwarding fails
1 parent 540495b commit cbe3261

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

tests/containers/kubernetes_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def deploy(self, container_name: str) -> None:
225225
assert len(pod_name.items) == 1
226226
pod: kubernetes.client.models.v1_pod.V1Pod = pod_name.items[0]
227227

228-
p = socket_proxy.SocketProxy(exposing_contextmanager(core_v1_api, pod), "localhost", 0)
228+
p = socket_proxy.SocketProxy(lambda: exposing_contextmanager(core_v1_api, pod), "localhost", 0)
229229
t = threading.Thread(target=p.listen_and_serve_until_canceled)
230230
t.start()
231231
self.tf.defer(t, lambda thread: thread.join())

tests/containers/socket_proxy.py

+34-24
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import select
77
import threading
88
import subprocess
9-
import typing
9+
import time
10+
from typing import Callable, ContextManager
1011

1112
from tests.containers.cancellation_token import CancellationToken
1213

@@ -50,7 +51,7 @@ def stop(self):
5051
class SocketProxy:
5152
def __init__(
5253
self,
53-
remote_socket_factory: typing.ContextManager[socket.socket],
54+
remote_socket_factory: Callable[..., ContextManager[socket.socket]],
5455
local_host: str = "localhost",
5556
local_port: int = 0,
5657
buffer_size: int = 4096
@@ -96,27 +97,32 @@ def get_actual_port(self) -> int:
9697
return self.server_socket.getsockname()[1]
9798

9899
def _handle_client(self, client_socket):
99-
with client_socket as _, self.remote_socket_factory as remote_socket:
100-
while True:
101-
readable, _, _ = select.select([client_socket, remote_socket, self.cancellation_token], [], [])
102-
103-
if self.cancellation_token.cancelled:
104-
break
105-
106-
if client_socket in readable:
107-
data = client_socket.recv(self.buffer_size)
108-
if not data:
109-
break
110-
remote_socket.send(data)
111-
112-
if remote_socket in readable:
113-
data = remote_socket.recv(self.buffer_size)
114-
if not data:
115-
break
116-
client_socket.send(data)
117-
118-
119-
if __name__ == "__main__":
100+
with client_socket as _:
101+
while not self.cancellation_token.cancelled:
102+
try:
103+
with self.remote_socket_factory() as remote_socket:
104+
while not self.cancellation_token.cancelled:
105+
readable, _, _ = select.select([client_socket, remote_socket, self.cancellation_token], [], [])
106+
107+
if client_socket in readable:
108+
data = client_socket.recv(self.buffer_size)
109+
if not data:
110+
return
111+
remote_socket.send(data)
112+
113+
if remote_socket in readable:
114+
data = remote_socket.recv(self.buffer_size)
115+
if not data:
116+
return
117+
client_socket.send(data)
118+
except ConnectionResetError as e:
119+
# data = remote_socket.recv(self.buffer_size) may fail like this
120+
# usually it happens if the pod has not been fully up, so retry is in order
121+
logging.info("failed to read, will try again", exc_info=e)
122+
time.sleep(2)
123+
124+
125+
def main() -> None:
120126
"""Sample application to show how this can work."""
121127

122128

@@ -161,7 +167,7 @@ def get_actual_port(self):
161167
server.join()
162168

163169

164-
proxy = SocketProxy(remote_socket_factory(), "localhost", 0)
170+
proxy = SocketProxy(remote_socket_factory, "localhost", 0)
165171
thread = threading.Thread(target=proxy.listen_and_serve_until_canceled)
166172
thread.start()
167173

@@ -171,3 +177,7 @@ def get_actual_port(self):
171177
print(client_socket.recv(1024)) # prints Hello World
172178

173179
thread.join()
180+
181+
182+
if __name__ == "__main__":
183+
main()

0 commit comments

Comments
 (0)