Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop the iterator for empty responses and do not process ERROR responses #22

Merged
merged 7 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import pydoc
from functools import partial
from types import SimpleNamespace

from kubernetes_asyncio import client

Expand All @@ -28,26 +29,13 @@
TYPE_LIST_SUFFIX = "List"


class SimpleNamespace:

def __init__(self, **kwargs):
self.__dict__.update(kwargs)


def _find_return_type(func):
for line in pydoc.getdoc(func).splitlines():
if line.startswith(PYDOC_RETURN_LABEL):
return line[len(PYDOC_RETURN_LABEL):].strip()
return ""


async def iter_resp_lines(resp):
line = await resp.content.readline()
if isinstance(line, bytes):
line = line.decode('utf8')
return line


class Stream(object):

def __init__(self, func, *args, **kwargs):
Expand All @@ -60,7 +48,6 @@ def __init__(self, return_type=None):
self._raw_return_type = return_type
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0

def stop(self):
self._stop = True
Expand All @@ -69,18 +56,36 @@ def get_return_type(self, func):
if self._raw_return_type:
return self._raw_return_type
return_type = _find_return_type(func)

if return_type.endswith(TYPE_LIST_SUFFIX):
return return_type[:-len(TYPE_LIST_SUFFIX)]
return return_type

def unmarshal_event(self, data, return_type):
def unmarshal_event(self, data: str, response_type):
"""Return the K8s response `data` in JSON format.

"""
js = json.loads(data)

# Make a copy of the original object and save it under the
# `raw_object` key because we will replace the data under `object` with
# a Python native type shortly.
js['raw_object'] = js['object']
if return_type:
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version

# Something went wrong. A typical example would be that the user
# supplied a resource version that was too old. In that case K8s would
# not send a conventional ADDED/DELETED/... event but an error. Turn
# this error into a Python exception to save the user the hassle.
if js['type'].lower() == 'error':
return js

# If possible, compile the JSON response into a Python native response
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
if response_type is not None:
js['object'] = self._api_client.deserialize(
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
response_type=response_type
)
return js

def __aiter__(self):
Expand All @@ -90,15 +95,26 @@ async def __anext__(self):
return await self.next()

async def next(self):
# Set the response object to the user supplied function (eg
# `list_namespaced_pods`) if this is the first iteration.
if self.resp is None:
self.resp = await self.func()

# Abort at the current iteration if the user has called `stop` on this
# stream instance.
if self._stop:
raise StopAsyncIteration

ret = await iter_resp_lines(self.resp)
ret = self.unmarshal_event(ret, self.return_type)
return ret
# Fetch the next K8s response.
line = await self.resp.content.readline()
line = line.decode('utf8')

# Stop the iterator if K8s sends an empty response. This happens when
# eg the supplied timeout has expired.
if line == '':
raise StopAsyncIteration

return self.unmarshal_event(line, self.return_type)

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down Expand Up @@ -129,7 +145,6 @@ def stream(self, func, *args, **kwargs):
self.return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False
timeouts = ('timeout_seconds' in kwargs)

self.func = partial(func, *args, **kwargs)
self.resp = None
Expand Down
86 changes: 74 additions & 12 deletions kubernetes_asyncio/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from asynctest import CoroutineMock, Mock, TestCase, patch
import json

from asynctest import CoroutineMock, Mock, TestCase

import kubernetes_asyncio
from kubernetes_asyncio.watch import Watch
Expand All @@ -23,30 +25,68 @@ class WatchTest(TestCase):
async def test_watch_with_decode(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
fake_resp.content.readline.side_effect = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1"},"spec": {}, "status": {}}}',
'{"type": "ADDED", "object": {"metadata": {"name": "test2"},"spec": {}, "status": {}}}',
'{"type": "ADDED", "object": {"metadata": {"name": "test3"},"spec": {}, "status": {}}}',
'should_not_happened']
side_effects = [
{
"type": "ADDED",
"object": {
"metadata": {"name": "test{}".format(uid)},
"spec": {}, "status": {}
}
}
for uid in range(3)
]
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
side_effects.extend([AssertionError('Should not have been called')])
fake_resp.content.readline.side_effect = side_effects

fake_api = Mock()
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

watch = kubernetes_asyncio.watch.Watch()
count = 1
async for e in watch.stream(fake_api.get_namespaces):
count = 0
async for e in watch.stream(fake_api.get_namespaces, resource_version='123'):
self.assertEqual("ADDED", e['type'])
# make sure decoder worked and we got a model with the right name
self.assertEqual("test%d" % count, e['object'].metadata.name)

# Stop the watch. This must not return the next event which would
# be an AssertionError exception.
count += 1
# make sure we can stop the watch and the last event with won't be
# returned
if count == 4:
if count == len(side_effects) - 1:
watch.stop()

fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
_preload_content=False, watch=True, resource_version='123')

async def test_watch_k8s_empty_response(self):
"""Stop the iterator when the response is empty.

This typically happens when the user supplied timeout expires.

"""
# Mock the readline return value to first return a valid response
# followed by an empty response.
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
side_effects = [
{"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}},
{"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}},
]
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
fake_resp.content.readline.side_effect = side_effects + [b'']

# Fake the K8s resource object to watch.
fake_api = Mock()
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

# Iteration must cease after all valid responses were received.
watch = kubernetes_asyncio.watch.Watch()
cnt = 0
async for _ in watch.stream(fake_api.get_namespaces):
cnt += 1
assert cnt == len(side_effects)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.assertEqual will be better here, it produces more meaningful errors.


def test_unmarshal_with_float_object(self):
w = Watch()
Expand All @@ -64,6 +104,28 @@ def test_unmarshal_with_no_return_type(self):
self.assertEqual(["test1"], event['object'])
self.assertEqual(["test1"], event['raw_object'])

async def test_unmarshall_k8s_error_response(self):
"""Never parse messages of type ERROR.

This test uses an actually recorded error, in this case for an outdated
resource version.

"""
# An actual error response sent by K8s during testing.
k8s_err = {
'type': 'ERROR',
'object': {
'kind': 'Status', 'apiVersion': 'v1', 'metadata': {},
'status': 'Failure',
'message': 'too old resource version: 1 (8146471)',
'reason': 'Gone', 'code': 410
}
}

ret = Watch().unmarshal_event(json.dumps(k8s_err), None)
assert ret['type'] == k8s_err['type']
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using assertEqual...

assert ret['object'] == ret['raw_object'] == k8s_err['object']

async def test_watch_with_exception(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
Expand Down