Skip to content

Commit a37f664

Browse files
Pod exec enhancements (#328)
* rework pod_exec example Rework this example to be closer to the one provided by the official kubernetes python api. The busybox pod is now created if it does not exist, making the example easier to use. Also, use contexts to ensure resources are properly released. This prepares the work for next commits that will add more cases in this pod_exec example. Signed-off-by: Olivier Matz <[email protected]> * introduce helper to get command return code in ws_client When the websocket API is used to execute a command on a pod, the status is sent over the ERROR_CHANNEL on termination. Add a helper to parse this information that returns the exit code of the command. This helper can only be used if _preload_content=False. The pod_exec example will be updated in next commit to make use of this new helper. Signed-off-by: Olivier Matz <[email protected]> * add an interactive case in pod_exec example Introduce an example that shows how the WsApiClient can be used to interactively execute a command on a pod. The example is similar to what is done in the official kubernetes python api. Signed-off-by: Olivier Matz <[email protected]> * remove extra await when calling ws_connect() The ClientSession.ws_connect() method is synchronous and returns a _RequestContextManager which takes a coroutine as parameter (here, ClientSession._ws_connect()). This context manager is in charge of closing the connection in its __aexit__() method, so it has to be used with "async with". However, this context manager can also be awaited as it has an __await__() method. In this case, it will await the _ws_connect() coroutine. This is what is done in the current code, but the connection will not be released. Remove the "await" to return the context manager, so that the user can use it with "async with", which will properly release resources. This is the documented way of using ws_connect(): https://docs.aiohttp.org/en/stable/client_quickstart.html#websockets Signed-off-by: Olivier Matz <[email protected]> --------- Signed-off-by: Olivier Matz <[email protected]>
1 parent 2126b1d commit a37f664

File tree

3 files changed

+157
-36
lines changed

3 files changed

+157
-36
lines changed

examples/pod_exec.py

+130-35
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,147 @@
11
import asyncio
22

3-
from kubernetes_asyncio import client, config
3+
from aiohttp.http import WSMsgType
4+
5+
from kubernetes_asyncio import client, config, utils
6+
from kubernetes_asyncio.client.api_client import ApiClient
47
from kubernetes_asyncio.stream import WsApiClient
8+
from kubernetes_asyncio.stream.ws_client import (
9+
ERROR_CHANNEL, STDERR_CHANNEL, STDOUT_CHANNEL,
10+
)
511

12+
BUSYBOX_POD = "busybox-test"
613

7-
async def main():
8-
# Configs can be set in Configuration class directly or using helper
9-
# utility. If no argument provided, the config will be loaded from
10-
# default location.
11-
await config.load_kube_config()
1214

13-
v1 = client.CoreV1Api()
15+
async def find_busybox_pod():
16+
async with ApiClient() as api:
17+
v1 = client.CoreV1Api(api)
18+
ret = await v1.list_pod_for_all_namespaces()
19+
for i in ret.items:
20+
if i.metadata.namespace == 'default' and i.metadata.name == BUSYBOX_POD:
21+
print(f"Found busybox pod: {i.metadata.name}")
22+
return i.metadata.name
23+
return None
1424

15-
print("Try to find a pod with busybox (name busybox*) ...")
16-
ret = await v1.list_pod_for_all_namespaces()
1725

18-
for i in ret.items:
19-
if i.metadata.name.startswith("busybox"):
20-
pod = i.metadata.name
21-
namespace = i.metadata.namespace
22-
print("Buxy box", pod, "namespace", namespace)
23-
break
24-
else:
25-
print("Busybox not found !")
26-
return
26+
async def create_busybox_pod():
27+
print(f"Pod {BUSYBOX_POD} does not exist. Creating it...")
28+
manifest = {
29+
'apiVersion': 'v1',
30+
'kind': 'Pod',
31+
'metadata': {
32+
'name': BUSYBOX_POD,
33+
},
34+
'spec': {
35+
'containers': [{
36+
'image': 'busybox',
37+
'name': 'sleep',
38+
"args": [
39+
"/bin/sh",
40+
"-c",
41+
"while true; do date; sleep 5; done"
42+
]
43+
}]
44+
}
45+
}
46+
async with ApiClient() as api:
47+
objects = await utils.create_from_dict(api, manifest, namespace="default")
48+
pod = objects[0]
49+
print(f"Created pod {pod.metadata.name}.")
50+
return pod.metadata.name
2751

28-
v1_ws = client.CoreV1Api(api_client=WsApiClient())
2952

30-
exec_command = [
31-
"/bin/sh",
32-
"-c",
33-
"echo This message goes to stderr >&2; echo This message goes to stdout",
34-
]
53+
async def wait_busybox_pod_ready():
54+
print(f"Waiting pod {BUSYBOX_POD} to be ready.")
55+
async with ApiClient() as api:
56+
v1 = client.CoreV1Api(api)
57+
while True:
58+
ret = await v1.read_namespaced_pod(name=BUSYBOX_POD, namespace="default")
59+
if ret.status.phase != 'Pending':
60+
break
61+
await asyncio.sleep(1)
3562

36-
resp = v1_ws.connect_get_namespaced_pod_exec(
37-
pod,
38-
namespace,
39-
command=exec_command,
40-
stderr=True,
41-
stdin=False,
42-
stdout=True,
43-
tty=False,
44-
)
4563

46-
ret = await resp
64+
async def main():
65+
# Configs can be set in Configuration class directly or using helper
66+
# utility. If no argument provided, the config will be loaded from
67+
# default location.
68+
await config.load_kube_config()
69+
70+
pod = await find_busybox_pod()
71+
if not pod:
72+
pod = await create_busybox_pod()
73+
await wait_busybox_pod_ready()
4774

48-
print("Response: ", ret)
75+
# Execute a command in a pod non-interactively, and display its output
76+
print("-------------")
77+
async with WsApiClient() as ws_api:
78+
v1_ws = client.CoreV1Api(api_client=ws_api)
79+
exec_command = [
80+
"/bin/sh",
81+
"-c",
82+
"echo This message goes to stderr >&2; echo This message goes to stdout",
83+
]
84+
ret = await v1_ws.connect_get_namespaced_pod_exec(
85+
pod,
86+
"default",
87+
command=exec_command,
88+
stderr=True,
89+
stdin=False,
90+
stdout=True,
91+
tty=False,
92+
)
93+
print(f"Response: {ret}")
4994

95+
# Execute a command interactively. If _preload_content=False is passed to
96+
# connect_get_namespaced_pod_exec(), the returned object is an aiohttp ClientWebSocketResponse
97+
# object, that can be manipulated directly.
98+
print("-------------")
99+
async with WsApiClient() as ws_api:
100+
v1_ws = client.CoreV1Api(api_client=ws_api)
101+
exec_command = ['/bin/sh']
102+
websocket = await v1_ws.connect_get_namespaced_pod_exec(
103+
BUSYBOX_POD,
104+
"default",
105+
command=exec_command,
106+
stderr=True,
107+
stdin=True,
108+
stdout=True,
109+
tty=False,
110+
_preload_content=False,
111+
)
112+
commands = [
113+
"echo 'This message goes to stdout'\n",
114+
"echo 'This message goes to stderr' >&2\n",
115+
"exit 1\n",
116+
]
117+
error_data = ""
118+
closed = False
119+
async with websocket as ws:
120+
while commands and not closed:
121+
command = commands.pop(0)
122+
stdin_channel_prefix = chr(0)
123+
await ws.send_bytes((stdin_channel_prefix + command).encode("utf-8"))
124+
while True:
125+
try:
126+
msg = await ws.receive(timeout=1)
127+
except asyncio.TimeoutError:
128+
break
129+
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
130+
closed = True
131+
break
132+
channel = msg.data[0]
133+
data = msg.data[1:].decode("utf-8")
134+
if not data:
135+
continue
136+
if channel == STDOUT_CHANNEL:
137+
print(f"stdout: {data}")
138+
elif channel == STDERR_CHANNEL:
139+
print(f"stderr: {data}")
140+
elif channel == ERROR_CHANNEL:
141+
error_data += data
142+
if error_data:
143+
returncode = ws_api.parse_error_data(error_data)
144+
print(f"Exit code: {returncode}")
50145

51146
if __name__ == "__main__":
52147
loop = asyncio.get_event_loop()

kubernetes_asyncio/stream/ws_client.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
# License for the specific language governing permissions and limitations
1111
# under the License.
1212

13+
import json
14+
1315
from six.moves.urllib.parse import urlencode, urlparse, urlunparse
1416

1517
from kubernetes_asyncio.client import ApiClient
@@ -54,6 +56,16 @@ def __init__(self, configuration=None, header_name=None, header_value=None,
5456
super().__init__(configuration, header_name, header_value, cookie, pool_threads)
5557
self.heartbeat = heartbeat
5658

59+
@classmethod
60+
def parse_error_data(cls, error_data):
61+
"""
62+
Parse data received on ERROR_CHANNEL and return the command exit code.
63+
"""
64+
error_data_json = json.loads(error_data)
65+
if error_data_json.get("status") == "Success":
66+
return 0
67+
return int(error_data_json["details"]["causes"][0]['message'])
68+
5769
async def request(self, method, url, query_params=None, headers=None,
5870
post_params=None, body=None, _preload_content=True,
5971
_request_timeout=None):
@@ -96,4 +108,4 @@ async def request(self, method, url, query_params=None, headers=None,
96108

97109
else:
98110

99-
return await self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=self.heartbeat)
111+
return self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=self.heartbeat)

kubernetes_asyncio/stream/ws_client_test.py

+14
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,17 @@ async def test_exec_ws_with_heartbeat(self):
108108
},
109109
heartbeat=30
110110
)
111+
112+
def test_parse_error_data_success(self):
113+
error_data = '{"metadata":{},"status":"Success"}'
114+
return_code = WsApiClient.parse_error_data(error_data)
115+
self.assertEqual(return_code, 0)
116+
117+
def test_parse_error_data_failure(self):
118+
error_data = (
119+
'{"metadata":{},"status":"Failure",'
120+
'"message":"command terminated with non-zero exit code",'
121+
'"reason":"NonZeroExitCode",'
122+
'"details":{"causes":[{"reason":"ExitCode","message":"1"}]}}')
123+
return_code = WsApiClient.parse_error_data(error_data)
124+
self.assertEqual(return_code, 1)

0 commit comments

Comments
 (0)