Skip to content

Add istio test, use curl /quitquitquit to exit sidecar, and some othe… #33306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import logging
import re
import secrets
import shlex
import string
import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable, Sequence

from kubernetes.client import CoreV1Api, models as k8s
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from slugify import slugify
from urllib3.exceptions import HTTPError

Expand Down Expand Up @@ -59,6 +61,7 @@
PodManager,
PodOperatorHookProtocol,
PodPhase,
container_is_succeeded,
get_container_termination_message,
)
from airflow.settings import pod_mutation_hook
Expand Down Expand Up @@ -246,7 +249,7 @@ class KubernetesPodOperator(BaseOperator):

# This field can be overloaded at the instance level via base_container_name
BASE_CONTAINER_NAME = "base"

ISTIO_CONTAINER_NAME = "istio-proxy"
POD_CHECKED_KEY = "already_checked"
POST_TERMINATION_TIMEOUT = 120

Expand Down Expand Up @@ -604,7 +607,10 @@ def execute_sync(self, context: Context):
if self.do_xcom_push:
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
result = self.extract_xcom(pod=self.pod)
self.remote_pod = self.pod_manager.await_pod_completion(self.pod)
istio_enabled = self.is_istio_enabled(self.pod)
self.remote_pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
finally:
self.cleanup(
pod=self.pod or self.pod_request_obj,
Expand Down Expand Up @@ -667,7 +673,8 @@ def execute_complete(self, context: Context, event: dict, **kwargs):
xcom_sidecar_output = self.extract_xcom(pod=pod)
return xcom_sidecar_output
finally:
pod = self.pod_manager.await_pod_completion(pod)
istio_enabled = self.is_istio_enabled(pod)
pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name)
if pod is not None:
self.post_complete_action(
pod=pod,
Expand Down Expand Up @@ -699,13 +706,16 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs):
)

def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
istio_enabled = self.is_istio_enabled(remote_pod)
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None

# if the pod fails or success, but we don't want to delete it
if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD:
self.patch_already_checked(remote_pod, reraise=False)

if pod_phase != PodPhase.SUCCEEDED:
if (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or (
istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name)
):
if self.log_events_on_failure:
self._read_pod_events(pod, reraise=False)

Expand Down Expand Up @@ -752,16 +762,61 @@ def _read_pod_events(self, pod, *, reraise=True):
for event in self.pod_manager.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)

def is_istio_enabled(self, pod: V1Pod) -> bool:
"""Checks if istio is enabled for the namespace of the pod by inspecting the namespace labels."""
if not pod:
return False

remote_pod = self.pod_manager.read_pod(pod)

for container in remote_pod.spec.containers:
if container.name == self.ISTIO_CONTAINER_NAME:
return True

return False

def kill_istio_sidecar(self, pod: V1Pod) -> None:
command = "/bin/sh -c curl -fsI -X POST http://localhost:15020/quitquitquit && exit 0"
command_to_container = shlex.split(command)
try:
resp = stream(
self.client.connect_get_namespaced_pod_exec,
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container=self.ISTIO_CONTAINER_NAME,
command=command_to_container,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_preload_content=False,
)
resp.close()
except Exception as e:
self.log.error("Error while deleting istio-proxy sidecar: %s", e)
raise e

def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
istio_enabled = self.is_istio_enabled(pod)
with _optionally_suppress(reraise=reraise):
if pod is not None:
should_delete_pod = (self.on_finish_action == OnFinishAction.DELETE_POD) or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and pod.status.phase == PodPhase.SUCCEEDED
should_delete_pod = (
(self.on_finish_action == OnFinishAction.DELETE_POD)
or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and pod.status.phase == PodPhase.SUCCEEDED
)
or (
self.on_finish_action == OnFinishAction.DELETE_SUCCEEDED_POD
and container_is_succeeded(pod, self.base_container_name)
)
)
if should_delete_pod:
if should_delete_pod and not istio_enabled:
self.log.info("Deleting pod: %s", pod.metadata.name)
self.pod_manager.delete_pod(pod)
elif should_delete_pod and istio_enabled:
self.log.info("Deleting istio-proxy sidecar inside %s: ", pod.metadata.name)
self.kill_istio_sidecar(pod)
else:
self.log.info("Skipping deleting pod: %s", pod.metadata.name)

Expand Down
23 changes: 22 additions & 1 deletion airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ def container_is_completed(pod: V1Pod, container_name: str) -> bool:
return container_status.state.terminated is not None


def container_is_succeeded(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded.

If that container is present and completed and succeeded, returns True. Returns False otherwise.
"""
if not container_is_completed(pod, container_name):
return False

container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.terminated.exit_code == 0


def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated.
Expand Down Expand Up @@ -508,17 +523,23 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
self.log.info("Waiting for container '%s' state to be completed", container_name)
time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
def await_pod_completion(
self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base"
) -> V1Pod:
"""
Monitors a pod and returns the final state.

:param istio_enabled: whether istio is enabled in the namespace
:param pod: pod spec that will be monitored
:param container_name: name of the container within the pod
:return: tuple[State, str | None]
"""
while True:
remote_pod = self.read_pod(pod)
if remote_pod.status.phase in PodPhase.terminal_states:
break
if istio_enabled and container_is_completed(remote_pod, container_name):
break
self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase)
time.sleep(2)
return remote_pod
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ isfile
ish
isn
isort
istio
iterable
iterables
iteratively
Expand Down
82 changes: 82 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,88 @@ def test_termination_message_policy_default_value_correctly_set(self):
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[0].termination_message_policy == "File"

@pytest.mark.parametrize(
"task_kwargs, base_container_fail, expect_to_delete_pod",
[
({"on_finish_action": "delete_pod"}, True, True),
({"on_finish_action": "delete_pod"}, False, True),
({"on_finish_action": "keep_pod"}, False, False),
({"on_finish_action": "keep_pod"}, True, False),
({"on_finish_action": "delete_succeeded_pod"}, False, True),
({"on_finish_action": "delete_succeeded_pod"}, True, False),
],
)
@patch(f"{KPO_MODULE}.KubernetesPodOperator.kill_istio_sidecar")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.is_istio_enabled")
@patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
def test_pod_with_istio_delete_after_await_container_error(
self,
find_pod_mock,
await_pod_completion_mock,
is_istio_enabled_mock,
kill_istio_sidecar_mock,
task_kwargs,
base_container_fail,
expect_to_delete_pod,
):
"""
When KPO fails unexpectedly during await_container, we should still try to delete the pod,
and the pod we try to delete should be the one returned from find_pod earlier.
"""
sidecar = MagicMock()
sidecar.name = "istio-proxy"
sidecar.namespace = "default"
sidecar.image = "istio/proxyv2:1.18.2"
sidecar.args = []
sidecar.state.running = True

cont_status_1 = MagicMock()
cont_status_1.name = "base"
cont_status_1.state.running = False
cont_status_1.state.terminated.exit_code = 0
if base_container_fail:
cont_status_1.state.terminated.exit_code = 1
cont_status_1.state.terminated.message = "my-failure"

cont_status_2 = MagicMock()
cont_status_2.name = "istio-proxy"
cont_status_2.state.running = True
cont_status_2.state.terminated = False

await_pod_completion_mock.return_value.spec.containers = [sidecar]
await_pod_completion_mock.return_value.status.phase = "Running"
await_pod_completion_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2]
await_pod_completion_mock.return_value.metadata.name = "pod-with-istio-sidecar"
await_pod_completion_mock.return_value.metadata.namespace = "default"

find_pod_mock.return_value.spec.containers = [sidecar]
find_pod_mock.return_value.status.phase = "Running"
find_pod_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2]
find_pod_mock.return_value.metadata.name = "pod-with-istio-sidecar"
find_pod_mock.return_value.metadata.namespace = "default"

k = KubernetesPodOperator(task_id="task", **task_kwargs)

context = create_context(k)
context["ti"].xcom_push = MagicMock()
if base_container_fail:
self.await_pod_mock.side_effect = AirflowException("fake failure")
with pytest.raises(AirflowException, match="my-failure"):
k.execute(context=context)
else:
k.execute(context=context)

assert is_istio_enabled_mock(find_pod_mock.return_value)
if task_kwargs["on_finish_action"] == "delete_pod":
kill_istio_sidecar_mock.assert_called_with(await_pod_completion_mock.return_value)
elif expect_to_delete_pod and base_container_fail:
kill_istio_sidecar_mock.assert_called_with(find_pod_mock.return_value)
elif expect_to_delete_pod and not base_container_fail:
kill_istio_sidecar_mock.assert_called_with(await_pod_completion_mock.return_value)
else:
kill_istio_sidecar_mock.assert_not_called()

@pytest.mark.parametrize(
"task_kwargs, should_be_deleted",
[
Expand Down
59 changes: 59 additions & 0 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
from datetime import datetime
from json.decoder import JSONDecodeError
from types import SimpleNamespace
from unittest import mock
from unittest.mock import MagicMock

Expand All @@ -36,6 +37,7 @@
PodManager,
PodPhase,
container_is_running,
container_is_succeeded,
container_is_terminated,
)
from airflow.utils.timezone import utc
Expand Down Expand Up @@ -693,3 +695,60 @@ def test_read_pod(
# second read
with time_machine.travel(mock_read_pod_at_1):
assert consumer.read_pod() == expected_read_pods[1]


def params_for_test_container_is_succeeded():
"""The `container_is_succeeded` method is designed to handle an assortment of bad objects
returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None,
an object `e` such that `e.status.container_statuses` is None, and so on. This function
emits params used in `test_container_is_succeeded` to verify this behavior.
We create mock classes not derived from MagicMock because with an instance `e` of MagicMock,
tests like `e.hello is not None` are always True.
"""

class RemotePodMock:
pass

class ContainerStatusMock:
def __init__(self, name):
self.name = name

def remote_pod(succeeded=None, not_succeeded=None):
e = RemotePodMock()
e.status = RemotePodMock()
e.status.container_statuses = []
for r in not_succeeded or []:
e.status.container_statuses.append(container(r, False))
for r in succeeded or []:
e.status.container_statuses.append(container(r, True))
return e

def container(name, succeeded):
c = ContainerStatusMock(name)
c.state = RemotePodMock()
c.state.terminated = SimpleNamespace(**{"exit_code": 0}) if succeeded else None
return c

pod_mock_list = []
pod_mock_list.append(pytest.param(None, False, id="None remote_pod"))
p = RemotePodMock()
p.status = None
pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status"))
p = RemotePodMock()
p.status = RemotePodMock()
p.status.container_statuses = []
pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses"))
pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty"))
pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 succeeded"))
pod_mock_list.append(pytest.param(remote_pod(["hello"], ["base"]), False, id="filter 1 not succeeded"))
pod_mock_list.append(pytest.param(remote_pod(["base"], ["hello"]), True, id="filter 1 succeeded"))
return pod_mock_list


@pytest.mark.parametrize("remote_pod, result", params_for_test_container_is_succeeded())
def test_container_is_succeeded(remote_pod, result):
"""The `container_is_succeeded` function is designed to handle an assortment of bad objects
returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None,
an object `e` such that `e.status.container_statuses` is None, and so on. This test
verifies the expected behavior."""
assert container_is_succeeded(remote_pod, "base") is result