Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 2d69e89

Browse files
authored
Merge pull request #109 from juliantaylor/fix-watch-reset
fix watching with a specified resource version
2 parents 8497dfb + 3c30a30 commit 2d69e89

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

watch/watch.py

+2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ def stream(self, func, *args, **kwargs):
124124
return_type = self.get_return_type(func)
125125
kwargs['watch'] = True
126126
kwargs['_preload_content'] = False
127+
if 'resource_version' in kwargs:
128+
self.resource_version = kwargs['resource_version']
127129

128130
timeouts = ('timeout_seconds' in kwargs)
129131
while True:

watch/watch_test.py

+72-1
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616

1717
import unittest
1818

19-
from mock import Mock
19+
from mock import Mock, call
2020

2121
from .watch import Watch
2222

2323

2424
class WatchTests(unittest.TestCase):
25+
def setUp(self):
26+
# counter for a test that needs test global state
27+
self.callcount = 0
2528

2629
def test_watch_with_decode(self):
2730
fake_resp = Mock()
@@ -64,6 +67,74 @@ def test_watch_with_decode(self):
6467
fake_resp.close.assert_called_once()
6568
fake_resp.release_conn.assert_called_once()
6669

70+
def test_watch_resource_version_set(self):
71+
# https://github.com/kubernetes-client/python/issues/700
72+
# ensure watching from a resource version does reset to resource
73+
# version 0 after k8s resets the watch connection
74+
fake_resp = Mock()
75+
fake_resp.close = Mock()
76+
fake_resp.release_conn = Mock()
77+
values = [
78+
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
79+
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
80+
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
81+
'"resourceVersion": "2"}, "spec": {}, "sta',
82+
'tus": {}}}\n'
83+
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
84+
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
85+
]
86+
# return nothing on the first call and values on the second
87+
# this emulates a watch from a rv that returns nothing in the first k8s
88+
# watch reset and values later
89+
90+
def get_values(*args, **kwargs):
91+
self.callcount += 1
92+
if self.callcount == 1:
93+
return []
94+
else:
95+
return values
96+
97+
fake_resp.read_chunked = Mock(
98+
side_effect=get_values)
99+
100+
fake_api = Mock()
101+
fake_api.get_namespaces = Mock(return_value=fake_resp)
102+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
103+
104+
w = Watch()
105+
# ensure we keep our requested resource version or the version latest
106+
# returned version when the existing versions are older than the
107+
# requested version
108+
# needed for the list existing objects, then watch from there use case
109+
calls = []
110+
111+
iterations = 2
112+
# first two calls must use the passed rv, the first call is a
113+
# "reset" and does not actually return anything
114+
# the second call must use the same rv but will return values
115+
# (with a wrong rv but a real cluster would behave correctly)
116+
# calls following that will use the rv from those returned values
117+
calls.append(call(_preload_content=False, watch=True,
118+
resource_version="5"))
119+
calls.append(call(_preload_content=False, watch=True,
120+
resource_version="5"))
121+
for i in range(iterations):
122+
# ideally we want 5 here but as rv must be treated as an
123+
# opaque value we cannot interpret it and order it so rely
124+
# on k8s returning the events completely and in order
125+
calls.append(call(_preload_content=False, watch=True,
126+
resource_version="3"))
127+
128+
for c, e in enumerate(w.stream(fake_api.get_namespaces,
129+
resource_version="5")):
130+
if c == len(values) * iterations:
131+
w.stop()
132+
133+
# check calls are in the list, gives good error output
134+
fake_api.get_namespaces.assert_has_calls(calls)
135+
# more strict test with worse error message
136+
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)
137+
67138
def test_watch_stream_twice(self):
68139
w = Watch(float)
69140
for step in ['first', 'second']:

0 commit comments

Comments
 (0)