Skip to content

Commit 1539bd0

Browse files
authored
KubernetesJobWatcher no longer inherits from Process (apache#11017)
multiprocessing.Process is set up in a very unfortunate manner that pretty much makes it impossible to test a class that inherits from Process or use any of its internal functions. For this reason we decided to seperate the actual process based functionality into a class member
1 parent 1a46e9b commit 1539bd0

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

airflow/executors/kubernetes_executor.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def _get_security_context_val(self, scontext: str) -> Union[str, int]:
125125
return int(val)
126126

127127

128-
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
128+
class KubernetesJobWatcher(LoggingMixin):
129129
"""Watches for Kubernetes jobs"""
130130

131131
def __init__(self,
@@ -142,6 +142,31 @@ def __init__(self,
142142
self.watcher_queue = watcher_queue
143143
self.resource_version = resource_version
144144
self.kube_config = kube_config
145+
self.watcher_process = multiprocessing.Process(target=self.run, args=())
146+
147+
def start(self):
148+
"""
149+
Start the watcher process
150+
"""
151+
self.watcher_process.start()
152+
153+
def is_alive(self):
154+
"""
155+
Check if the watcher process is alive
156+
"""
157+
self.watcher_process.is_alive()
158+
159+
def join(self):
160+
"""
161+
Join watcher process
162+
"""
163+
self.watcher_process.join()
164+
165+
def terminate(self):
166+
"""
167+
Terminate watcher process
168+
"""
169+
self.watcher_process.terminate()
145170

146171
def run(self) -> None:
147172
"""Performs watching"""

tests/executors/test_kubernetes_executor.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
#
18+
import multiprocessing
1819
import random
1920
import re
2021
import string
@@ -30,14 +31,66 @@
3031
try:
3132
from kubernetes.client.rest import ApiException
3233

33-
from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubernetesExecutor
34+
from airflow.executors.kubernetes_executor import (
35+
AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher,
36+
)
3437
from airflow.kubernetes import pod_generator
3538
from airflow.kubernetes.pod_generator import PodGenerator
3639
from airflow.utils.state import State
3740
except ImportError:
3841
AirflowKubernetesScheduler = None # type: ignore
3942

4043

44+
class TestKubernetesJobWatcher(unittest.TestCase):
45+
def setUp(self) -> None:
46+
self.watcher_queue = multiprocessing.Manager().Queue()
47+
self.watcher = KubernetesJobWatcher(
48+
namespace="namespace",
49+
multi_namespace_mode=False,
50+
watcher_queue=self.watcher_queue,
51+
resource_version="0",
52+
worker_uuid="0",
53+
kube_config=None,
54+
)
55+
56+
def test_running_task(self):
57+
self.watcher.process_status(
58+
pod_id="pod_id",
59+
namespace="namespace",
60+
status="Running",
61+
annotations={"foo": "bar"},
62+
resource_version="5",
63+
event={"type": "ADDED"}
64+
)
65+
self.assertTrue(self.watcher_queue.empty())
66+
67+
def test_succeeded_task(self):
68+
self.watcher.process_status(
69+
pod_id="pod_id",
70+
namespace="namespace",
71+
status="Succeeded",
72+
annotations={"foo": "bar"},
73+
resource_version="5",
74+
event={"type": "ADDED"}
75+
)
76+
result = self.watcher_queue.get_nowait()
77+
self.assertEqual(('pod_id', 'namespace', None, {'foo': 'bar'}, '5'), result)
78+
self.assertTrue(self.watcher_queue.empty())
79+
80+
def test_failed_task(self):
81+
self.watcher.process_status(
82+
pod_id="pod_id",
83+
namespace="namespace",
84+
status="Failed",
85+
annotations={"foo": "bar"},
86+
resource_version="5",
87+
event={"type": "ADDED"}
88+
)
89+
result = self.watcher_queue.get_nowait()
90+
self.assertEqual(('pod_id', 'namespace', "failed", {'foo': 'bar'}, '5'), result)
91+
self.assertTrue(self.watcher_queue.empty())
92+
93+
4194
# pylint: disable=unused-argument
4295
class TestAirflowKubernetesScheduler(unittest.TestCase):
4396
@staticmethod

0 commit comments

Comments
 (0)