diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 7a1cd975e6931..df73ac166e1b5 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -22,6 +22,7 @@ import logging import re import secrets +import shlex import string import warnings from collections.abc import Container @@ -29,7 +30,8 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, Iterable, Sequence -from kubernetes.client import CoreV1Api, models as k8s +from kubernetes.client import CoreV1Api, V1Pod, models as k8s +from kubernetes.stream import stream from slugify import slugify from urllib3.exceptions import HTTPError @@ -59,6 +61,7 @@ PodManager, PodOperatorHookProtocol, PodPhase, + container_is_succeeded, get_container_termination_message, ) from airflow.settings import pod_mutation_hook @@ -246,7 +249,7 @@ class KubernetesPodOperator(BaseOperator): # This field can be overloaded at the instance level via base_container_name BASE_CONTAINER_NAME = "base" - + ISTIO_CONTAINER_NAME = "istio-proxy" POD_CHECKED_KEY = "already_checked" POST_TERMINATION_TIMEOUT = 120 @@ -604,7 +607,10 @@ def execute_sync(self, context: Context): if self.do_xcom_push: self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod) result = self.extract_xcom(pod=self.pod) - self.remote_pod = self.pod_manager.await_pod_completion(self.pod) + istio_enabled = self.is_istio_enabled(self.pod) + self.remote_pod = self.pod_manager.await_pod_completion( + self.pod, istio_enabled, self.base_container_name + ) finally: self.cleanup( pod=self.pod or self.pod_request_obj, @@ -667,7 +673,8 @@ def execute_complete(self, context: Context, event: dict, **kwargs): xcom_sidecar_output = self.extract_xcom(pod=pod) return xcom_sidecar_output finally: - pod = self.pod_manager.await_pod_completion(pod) + istio_enabled = self.is_istio_enabled(pod) + pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name) if pod is not None: self.post_complete_action( pod=pod, @@ -699,13 +706,16 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs): ) def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): + istio_enabled = self.is_istio_enabled(remote_pod) pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None # if the pod fails or success, but we don't want to delete it if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD: self.patch_already_checked(remote_pod, reraise=False) - if pod_phase != PodPhase.SUCCEEDED: + if (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or ( + istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name) + ): if self.log_events_on_failure: self._read_pod_events(pod, reraise=False) @@ -752,16 +762,61 @@ def _read_pod_events(self, pod, *, reraise=True): for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) + def is_istio_enabled(self, pod: V1Pod) -> bool: + """Checks if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" + if not pod: + return False + + remote_pod = self.pod_manager.read_pod(pod) + + for container in remote_pod.spec.containers: + if container.name == self.ISTIO_CONTAINER_NAME: + return True + + return False + + def kill_istio_sidecar(self, pod: V1Pod) -> None: + command = "/bin/sh -c curl -fsI -X POST http://localhost:15020/quitquitquit && exit 0" + command_to_container = shlex.split(command) + try: + resp = stream( + self.client.connect_get_namespaced_pod_exec, + name=pod.metadata.name, + namespace=pod.metadata.namespace, + container=self.ISTIO_CONTAINER_NAME, + command=command_to_container, + stderr=True, + stdin=True, + stdout=True, + tty=False, + _preload_content=False, + ) + resp.close() + except Exception as e: + self.log.error("Error while deleting istio-proxy sidecar: %s", e) + raise e + def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): + istio_enabled = self.is_istio_enabled(pod) with _optionally_suppress(reraise=reraise): if pod is not None: - should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or ( - self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD - and pod.status.phase == PodPhase.SUCCEEDED + should_delete_pod = ( + (self.on_finish_action == OnFinishAction.DELETE_POD) + or ( + self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD + and pod.status.phase == PodPhase.SUCCEEDED + ) + or ( + self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD + and container_is_succeeded(pod, self.base_container_name) + ) ) - if should_delete_pod: + if should_delete_pod and not istio_enabled: self.log.info("Deleting pod: %s", pod.metadata.name) self.pod_manager.delete_pod(pod) + elif should_delete_pod and istio_enabled: + self.log.info("Deleting istio-proxy sidecar inside %s: ", pod.metadata.name) + self.kill_istio_sidecar(pod) else: self.log.info("Skipping deleting pod: %s", pod.metadata.name) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 650be681d3ee8..3a84ea22d3135 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -145,6 +145,21 @@ def container_is_completed(pod: V1Pod, container_name: str) -> bool: return container_status.state.terminated is not None +def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. + + If that container is present and completed and succeeded, returns True. Returns False otherwise. + """ + if not container_is_completed(pod, container_name): + return False + + container_status = get_container_status(pod, container_name) + if not container_status: + return False + return container_status.state.terminated.exit_code == 0 + + def container_is_terminated(pod: V1Pod, container_name: str) -> bool: """ Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated. @@ -508,17 +523,23 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: self.log.info("Waiting for container '%s' state to be completed", container_name) time.sleep(1) - def await_pod_completion(self, pod: V1Pod) -> V1Pod: + def await_pod_completion( + self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base" + ) -> V1Pod: """ Monitors a pod and returns the final state. + :param istio_enabled: whether istio is enabled in the namespace :param pod: pod spec that will be monitored + :param container_name: name of the container within the pod :return: tuple[State, str | None] """ while True: remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break + if istio_enabled and container_is_completed(remote_pod, container_name): + break self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 7a48585982c62..ec01067b12bfe 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -789,6 +789,7 @@ isfile ish isn isort +istio iterable iterables iteratively diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 7b40103dac539..ae02e384bb623 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -614,6 +614,88 @@ def test_termination_message_policy_default_value_correctly_set(self): pod = k.build_pod_request_obj(create_context(k)) assert pod.spec.containers[0].termination_message_policy == "File" + @pytest.mark.parametrize( + "task_kwargs, base_container_fail, expect_to_delete_pod", + [ + ({"on_finish_action": "delete_pod"}, True, True), + ({"on_finish_action": "delete_pod"}, False, True), + ({"on_finish_action": "keep_pod"}, False, False), + ({"on_finish_action": "keep_pod"}, True, False), + ({"on_finish_action": "delete_succeeded_pod"}, False, True), + ({"on_finish_action": "delete_succeeded_pod"}, True, False), + ], + ) + @patch(f"{KPO_MODULE}.KubernetesPodOperator.kill_istio_sidecar") + @patch(f"{KPO_MODULE}.KubernetesPodOperator.is_istio_enabled") + @patch(f"{POD_MANAGER_CLASS}.await_pod_completion") + @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod") + def test_pod_with_istio_delete_after_await_container_error( + self, + find_pod_mock, + await_pod_completion_mock, + is_istio_enabled_mock, + kill_istio_sidecar_mock, + task_kwargs, + base_container_fail, + expect_to_delete_pod, + ): + """ + When KPO fails unexpectedly during await_container, we should still try to delete the pod, + and the pod we try to delete should be the one returned from find_pod earlier. + """ + sidecar = MagicMock() + sidecar.name = "istio-proxy" + sidecar.namespace = "default" + sidecar.image = "istio/proxyv2:1.18.2" + sidecar.args = [] + sidecar.state.running = True + + cont_status_1 = MagicMock() + cont_status_1.name = "base" + cont_status_1.state.running = False + cont_status_1.state.terminated.exit_code = 0 + if base_container_fail: + cont_status_1.state.terminated.exit_code = 1 + cont_status_1.state.terminated.message = "my-failure" + + cont_status_2 = MagicMock() + cont_status_2.name = "istio-proxy" + cont_status_2.state.running = True + cont_status_2.state.terminated = False + + await_pod_completion_mock.return_value.spec.containers = [sidecar] + await_pod_completion_mock.return_value.status.phase = "Running" + await_pod_completion_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2] + await_pod_completion_mock.return_value.metadata.name = "pod-with-istio-sidecar" + await_pod_completion_mock.return_value.metadata.namespace = "default" + + find_pod_mock.return_value.spec.containers = [sidecar] + find_pod_mock.return_value.status.phase = "Running" + find_pod_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2] + find_pod_mock.return_value.metadata.name = "pod-with-istio-sidecar" + find_pod_mock.return_value.metadata.namespace = "default" + + k = KubernetesPodOperator(task_id="task", **task_kwargs) + + context = create_context(k) + context["ti"].xcom_push = MagicMock() + if base_container_fail: + self.await_pod_mock.side_effect = AirflowException("fake failure") + with pytest.raises(AirflowException, match="my-failure"): + k.execute(context=context) + else: + k.execute(context=context) + + assert is_istio_enabled_mock(find_pod_mock.return_value) + if task_kwargs["on_finish_action"] == "delete_pod": + kill_istio_sidecar_mock.assert_called_with(await_pod_completion_mock.return_value) + elif expect_to_delete_pod and base_container_fail: + kill_istio_sidecar_mock.assert_called_with(find_pod_mock.return_value) + elif expect_to_delete_pod and not base_container_fail: + kill_istio_sidecar_mock.assert_called_with(await_pod_completion_mock.return_value) + else: + kill_istio_sidecar_mock.assert_not_called() + @pytest.mark.parametrize( "task_kwargs, should_be_deleted", [ diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index c915a03a4aa08..c6c42105b367b 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -19,6 +19,7 @@ import logging from datetime import datetime from json.decoder import JSONDecodeError +from types import SimpleNamespace from unittest import mock from unittest.mock import MagicMock @@ -36,6 +37,7 @@ PodManager, PodPhase, container_is_running, + container_is_succeeded, container_is_terminated, ) from airflow.utils.timezone import utc @@ -693,3 +695,60 @@ def test_read_pod( # second read with time_machine.travel(mock_read_pod_at_1): assert consumer.read_pod() == expected_read_pods[1] + + +def params_for_test_container_is_succeeded(): + """The `container_is_succeeded` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_succeeded` to verify this behavior. + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(succeeded=None, not_succeeded=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + for r in not_succeeded or []: + e.status.container_statuses.append(container(r, False)) + for r in succeeded or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, succeeded): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.terminated = SimpleNamespace(**{"exit_code": 0}) if succeeded else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id="None remote_pod")) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status")) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses")) + pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty")) + pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 succeeded")) + pod_mock_list.append(pytest.param(remote_pod(["hello"], ["base"]), False, id="filter 1 not succeeded")) + pod_mock_list.append(pytest.param(remote_pod(["base"], ["hello"]), True, id="filter 1 succeeded")) + return pod_mock_list + + +@pytest.mark.parametrize("remote_pod, result", params_for_test_container_is_succeeded()) +def test_container_is_succeeded(remote_pod, result): + """The `container_is_succeeded` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_succeeded(remote_pod, "base") is result