Skip to content

Commit e375f02

Browse files
authored
Merge pull request #30 from tomplus/feat/watch-forever
feat: watch work forever if timeout is not specified
2 parents 12e3374 + ef5152a commit e375f02

File tree

4 files changed

+95
-23
lines changed

4 files changed

+95
-23
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pip install -r test-requirements.txt
7575
You can run the style checks and tests with
7676

7777
```bash
78-
flake8 && isort -c
78+
flake8 kubernetes_asyncio/
79+
isort --diff --recursive kubernetes_asyncio/
7980
nosetests
8081
```
Binary file not shown.

kubernetes_asyncio/watch/watch.py

+47-20
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import asyncio
1516
import json
1617
import pydoc
1718
from functools import partial
@@ -48,6 +49,7 @@ def __init__(self, return_type=None):
4849
self._raw_return_type = return_type
4950
self._stop = False
5051
self._api_client = client.ApiClient()
52+
self.resource_version = 0
5153

5254
def stop(self):
5355
self._stop = True
@@ -86,6 +88,19 @@ def unmarshal_event(self, data: str, response_type):
8688
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
8789
response_type=response_type
8890
)
91+
92+
# decode and save resource_version to continue watching
93+
if hasattr(js['object'], 'metadata'):
94+
self.resource_version = js['object'].metadata.resource_version
95+
96+
# For custom objects that we don't have model defined, json
97+
# deserialization results in dictionary
98+
elif (isinstance(js['object'], dict) and
99+
'metadata' in js['object'] and
100+
'resourceVersion' in js['object']['metadata']):
101+
102+
self.resource_version = js['object']['metadata']['resourceVersion']
103+
89104
return js
90105

91106
def __aiter__(self):
@@ -95,26 +110,38 @@ async def __anext__(self):
95110
return await self.next()
96111

97112
async def next(self):
98-
# Set the response object to the user supplied function (eg
99-
# `list_namespaced_pods`) if this is the first iteration.
100-
if self.resp is None:
101-
self.resp = await self.func()
102-
103-
# Abort at the current iteration if the user has called `stop` on this
104-
# stream instance.
105-
if self._stop:
106-
raise StopAsyncIteration
107-
108-
# Fetch the next K8s response.
109-
line = await self.resp.content.readline()
110-
line = line.decode('utf8')
111-
112-
# Stop the iterator if K8s sends an empty response. This happens when
113-
# eg the supplied timeout has expired.
114-
if line == '':
115-
raise StopAsyncIteration
116-
117-
return self.unmarshal_event(line, self.return_type)
113+
114+
while 1:
115+
116+
# Set the response object to the user supplied function (eg
117+
# `list_namespaced_pods`) if this is the first iteration.
118+
if self.resp is None:
119+
self.resp = await self.func()
120+
121+
# Abort at the current iteration if the user has called `stop` on this
122+
# stream instance.
123+
if self._stop:
124+
raise StopAsyncIteration
125+
126+
# Fetch the next K8s response.
127+
try:
128+
line = await self.resp.content.readline()
129+
except asyncio.TimeoutError:
130+
if 'timeout_seconds' not in self.func.keywords:
131+
self.resp = None
132+
self.func.keywords['resource_version'] = self.resource_version
133+
continue
134+
else:
135+
raise
136+
137+
line = line.decode('utf8')
138+
139+
# Stop the iterator if K8s sends an empty response. This happens when
140+
# eg the supplied timeout has expired.
141+
if line == '':
142+
raise StopAsyncIteration
143+
144+
return self.unmarshal_event(line, self.return_type)
118145

119146
def stream(self, func, *args, **kwargs):
120147
"""Watch an API resource and stream the result back via a generator.

kubernetes_asyncio/watch/watch_test.py

+46-2
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import asyncio
1516
import json
1617

17-
from asynctest import CoroutineMock, Mock, TestCase
18+
from asynctest import CoroutineMock, Mock, TestCase, call
1819

1920
import kubernetes_asyncio
2021
from kubernetes_asyncio.watch import Watch
@@ -29,7 +30,8 @@ async def test_watch_with_decode(self):
2930
{
3031
"type": "ADDED",
3132
"object": {
32-
"metadata": {"name": "test{}".format(uid)},
33+
"metadata": {"name": "test{}".format(uid),
34+
"resourceVersion": str(uid)},
3335
"spec": {}, "status": {}
3436
}
3537
}
@@ -49,6 +51,9 @@ async def test_watch_with_decode(self):
4951
self.assertEqual("ADDED", e['type'])
5052
# make sure decoder worked and we got a model with the right name
5153
self.assertEqual("test%d" % count, e['object'].metadata.name)
54+
# make sure decoder worked and updated Watch.resource_version
55+
self.assertEqual(e['object'].metadata.resource_version, str(count))
56+
self.assertEqual(watch.resource_version, str(count))
5257

5358
# Stop the watch. This must not return the next event which would
5459
# be an AssertionError exception.
@@ -127,6 +132,19 @@ async def test_unmarshall_k8s_error_response(self):
127132
self.assertEqual(ret['object'], k8s_err['object'])
128133
self.assertEqual(ret['object'], k8s_err['object'])
129134

135+
def test_unmarshal_with_custom_object(self):
136+
w = Watch()
137+
event = w.unmarshal_event('{"type": "ADDED", "object": {"apiVersion":'
138+
'"test.com/v1beta1","kind":"foo","metadata":'
139+
'{"name": "bar", "resourceVersion": "1"}}}',
140+
'object')
141+
self.assertEqual("ADDED", event['type'])
142+
# make sure decoder deserialized json into dictionary and updated
143+
# Watch.resource_version
144+
self.assertTrue(isinstance(event['object'], dict))
145+
self.assertEqual("1", event['object']['metadata']['resourceVersion'])
146+
self.assertEqual("1", w.resource_version)
147+
130148
async def test_watch_with_exception(self):
131149
fake_resp = CoroutineMock()
132150
fake_resp.content.readline = CoroutineMock()
@@ -140,6 +158,32 @@ async def test_watch_with_exception(self):
140158
async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa
141159
pass
142160

161+
async def test_watch_timeout(self):
162+
fake_resp = CoroutineMock()
163+
fake_resp.content.readline = CoroutineMock()
164+
165+
mock_event = {"type": "ADDED",
166+
"object": {"metadata": {"name": "test1555",
167+
"resourceVersion": "1555"},
168+
"spec": {},
169+
"status": {}}}
170+
171+
fake_resp.content.readline.side_effect = [json.dumps(mock_event).encode('utf8'),
172+
asyncio.TimeoutError(),
173+
b""]
174+
175+
fake_api = Mock()
176+
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
177+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
178+
179+
watch = kubernetes_asyncio.watch.Watch()
180+
async for e in watch.stream(fake_api.get_namespaces): # noqa
181+
pass
182+
183+
fake_api.get_namespaces.assert_has_calls(
184+
[call(_preload_content=False, watch=True),
185+
call(_preload_content=False, watch=True, resource_version='1555')])
186+
143187

144188
if __name__ == '__main__':
145189
import asynctest

0 commit comments

Comments
 (0)