Skip to content

Commit 176f41d

Browse files
authored
Merge pull request #22 from olitheolix/oli-k8s-errors
Stop the iterator for empty responses and do not process ERROR responses
2 parents fb68f98 + ce3f4f1 commit 176f41d

File tree

2 files changed

+114
-36
lines changed

2 files changed

+114
-36
lines changed

kubernetes_asyncio/watch/watch.py

+39-24
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import json
1616
import pydoc
1717
from functools import partial
18+
from types import SimpleNamespace
1819

1920
from kubernetes_asyncio import client
2021

@@ -28,26 +29,13 @@
2829
TYPE_LIST_SUFFIX = "List"
2930

3031

31-
class SimpleNamespace:
32-
33-
def __init__(self, **kwargs):
34-
self.__dict__.update(kwargs)
35-
36-
3732
def _find_return_type(func):
3833
for line in pydoc.getdoc(func).splitlines():
3934
if line.startswith(PYDOC_RETURN_LABEL):
4035
return line[len(PYDOC_RETURN_LABEL):].strip()
4136
return ""
4237

4338

44-
async def iter_resp_lines(resp):
45-
line = await resp.content.readline()
46-
if isinstance(line, bytes):
47-
line = line.decode('utf8')
48-
return line
49-
50-
5139
class Stream(object):
5240

5341
def __init__(self, func, *args, **kwargs):
@@ -60,7 +48,6 @@ def __init__(self, return_type=None):
6048
self._raw_return_type = return_type
6149
self._stop = False
6250
self._api_client = client.ApiClient()
63-
self.resource_version = 0
6451

6552
def stop(self):
6653
self._stop = True
@@ -69,18 +56,36 @@ def get_return_type(self, func):
6956
if self._raw_return_type:
7057
return self._raw_return_type
7158
return_type = _find_return_type(func)
59+
7260
if return_type.endswith(TYPE_LIST_SUFFIX):
7361
return return_type[:-len(TYPE_LIST_SUFFIX)]
7462
return return_type
7563

76-
def unmarshal_event(self, data, return_type):
64+
def unmarshal_event(self, data: str, response_type):
65+
"""Return the K8s response `data` in JSON format.
66+
67+
"""
7768
js = json.loads(data)
69+
70+
# Make a copy of the original object and save it under the
71+
# `raw_object` key because we will replace the data under `object` with
72+
# a Python native type shortly.
7873
js['raw_object'] = js['object']
79-
if return_type:
80-
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
81-
js['object'] = self._api_client.deserialize(obj, return_type)
82-
if hasattr(js['object'], 'metadata'):
83-
self.resource_version = js['object'].metadata.resource_version
74+
75+
# Something went wrong. A typical example would be that the user
76+
# supplied a resource version that was too old. In that case K8s would
77+
# not send a conventional ADDED/DELETED/... event but an error. Turn
78+
# this error into a Python exception to save the user the hassle.
79+
if js['type'].lower() == 'error':
80+
return js
81+
82+
# If possible, compile the JSON response into a Python native response
83+
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
84+
if response_type is not None:
85+
js['object'] = self._api_client.deserialize(
86+
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
87+
response_type=response_type
88+
)
8489
return js
8590

8691
def __aiter__(self):
@@ -90,15 +95,26 @@ async def __anext__(self):
9095
return await self.next()
9196

9297
async def next(self):
98+
# Set the response object to the user supplied function (eg
99+
# `list_namespaced_pods`) if this is the first iteration.
93100
if self.resp is None:
94101
self.resp = await self.func()
95102

103+
# Abort at the current iteration if the user has called `stop` on this
104+
# stream instance.
96105
if self._stop:
97106
raise StopAsyncIteration
98107

99-
ret = await iter_resp_lines(self.resp)
100-
ret = self.unmarshal_event(ret, self.return_type)
101-
return ret
108+
# Fetch the next K8s response.
109+
line = await self.resp.content.readline()
110+
line = line.decode('utf8')
111+
112+
# Stop the iterator if K8s sends an empty response. This happens when
113+
# eg the supplied timeout has expired.
114+
if line == '':
115+
raise StopAsyncIteration
116+
117+
return self.unmarshal_event(line, self.return_type)
102118

103119
def stream(self, func, *args, **kwargs):
104120
"""Watch an API resource and stream the result back via a generator.
@@ -129,7 +145,6 @@ def stream(self, func, *args, **kwargs):
129145
self.return_type = self.get_return_type(func)
130146
kwargs['watch'] = True
131147
kwargs['_preload_content'] = False
132-
timeouts = ('timeout_seconds' in kwargs)
133148

134149
self.func = partial(func, *args, **kwargs)
135150
self.resp = None

kubernetes_asyncio/watch/watch_test.py

+75-12
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from asynctest import CoroutineMock, Mock, TestCase, patch
15+
import json
16+
17+
from asynctest import CoroutineMock, Mock, TestCase
1618

1719
import kubernetes_asyncio
1820
from kubernetes_asyncio.watch import Watch
@@ -23,30 +25,68 @@ class WatchTest(TestCase):
2325
async def test_watch_with_decode(self):
2426
fake_resp = CoroutineMock()
2527
fake_resp.content.readline = CoroutineMock()
26-
fake_resp.content.readline.side_effect = [
27-
'{"type": "ADDED", "object": {"metadata": {"name": "test1"},"spec": {}, "status": {}}}',
28-
'{"type": "ADDED", "object": {"metadata": {"name": "test2"},"spec": {}, "status": {}}}',
29-
'{"type": "ADDED", "object": {"metadata": {"name": "test3"},"spec": {}, "status": {}}}',
30-
'should_not_happened']
28+
side_effects = [
29+
{
30+
"type": "ADDED",
31+
"object": {
32+
"metadata": {"name": "test{}".format(uid)},
33+
"spec": {}, "status": {}
34+
}
35+
}
36+
for uid in range(3)
37+
]
38+
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
39+
side_effects.extend([AssertionError('Should not have been called')])
40+
fake_resp.content.readline.side_effect = side_effects
3141

3242
fake_api = Mock()
3343
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
3444
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
3545

3646
watch = kubernetes_asyncio.watch.Watch()
37-
count = 1
38-
async for e in watch.stream(fake_api.get_namespaces):
47+
count = 0
48+
async for e in watch.stream(fake_api.get_namespaces, resource_version='123'):
3949
self.assertEqual("ADDED", e['type'])
4050
# make sure decoder worked and we got a model with the right name
4151
self.assertEqual("test%d" % count, e['object'].metadata.name)
52+
53+
# Stop the watch. This must not return the next event which would
54+
# be an AssertionError exception.
4255
count += 1
43-
# make sure we can stop the watch and the last event with won't be
44-
# returned
45-
if count == 4:
56+
if count == len(side_effects) - 1:
4657
watch.stop()
4758

4859
fake_api.get_namespaces.assert_called_once_with(
49-
_preload_content=False, watch=True)
60+
_preload_content=False, watch=True, resource_version='123')
61+
62+
async def test_watch_k8s_empty_response(self):
63+
"""Stop the iterator when the response is empty.
64+
65+
This typically happens when the user supplied timeout expires.
66+
67+
"""
68+
# Mock the readline return value to first return a valid response
69+
# followed by an empty response.
70+
fake_resp = CoroutineMock()
71+
fake_resp.content.readline = CoroutineMock()
72+
side_effects = [
73+
{"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}},
74+
{"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}},
75+
]
76+
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
77+
fake_resp.content.readline.side_effect = side_effects + [b'']
78+
79+
# Fake the K8s resource object to watch.
80+
fake_api = Mock()
81+
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
82+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
83+
84+
# Iteration must cease after all valid responses were received.
85+
watch = kubernetes_asyncio.watch.Watch()
86+
cnt = 0
87+
async for _ in watch.stream(fake_api.get_namespaces):
88+
cnt += 1
89+
self.assertEqual(cnt, len(side_effects))
5090

5191
def test_unmarshal_with_float_object(self):
5292
w = Watch()
@@ -64,6 +104,29 @@ def test_unmarshal_with_no_return_type(self):
64104
self.assertEqual(["test1"], event['object'])
65105
self.assertEqual(["test1"], event['raw_object'])
66106

107+
async def test_unmarshall_k8s_error_response(self):
108+
"""Never parse messages of type ERROR.
109+
110+
This test uses an actually recorded error, in this case for an outdated
111+
resource version.
112+
113+
"""
114+
# An actual error response sent by K8s during testing.
115+
k8s_err = {
116+
'type': 'ERROR',
117+
'object': {
118+
'kind': 'Status', 'apiVersion': 'v1', 'metadata': {},
119+
'status': 'Failure',
120+
'message': 'too old resource version: 1 (8146471)',
121+
'reason': 'Gone', 'code': 410
122+
}
123+
}
124+
125+
ret = Watch().unmarshal_event(json.dumps(k8s_err), None)
126+
self.assertEqual(ret['type'], k8s_err['type'])
127+
self.assertEqual(ret['object'], k8s_err['object'])
128+
self.assertEqual(ret['object'], k8s_err['object'])
129+
67130
async def test_watch_with_exception(self):
68131
fake_resp = CoroutineMock()
69132
fake_resp.content.readline = CoroutineMock()

0 commit comments

Comments
 (0)