Skip to content

Commit e991f60

Browse files
Add active_deadline_seconds parameter to KubernetesPodOperator (#33379)
* Inserting active_deadline_seconds in KPO * Fixing tests * Fix active_deadline_seconds test * Fixing tests * parametrize task_id to create a pod per task --------- Co-authored-by: Hussein Awala <[email protected]>
1 parent 46aa429 commit e991f60

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ class KubernetesPodOperator(BaseOperator):
240240
Deprecated - use `on_finish_action` instead.
241241
:param termination_message_policy: The termination message policy of the base container.
242242
Default value is "File"
243+
:param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds
244+
in V1PodSpec.
243245
"""
244246

245247
# This field can be overloaded at the instance level via base_container_name
@@ -320,6 +322,7 @@ def __init__(
320322
on_finish_action: str = "delete_pod",
321323
is_delete_operator_pod: None | bool = None,
322324
termination_message_policy: str = "File",
325+
active_deadline_seconds: int | None = None,
323326
**kwargs,
324327
) -> None:
325328
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
@@ -417,6 +420,7 @@ def __init__(
417420
self.on_finish_action = OnFinishAction(on_finish_action)
418421
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
419422
self.termination_message_policy = termination_message_policy
423+
self.active_deadline_seconds = active_deadline_seconds
420424

421425
self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict
422426

@@ -860,6 +864,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
860864
restart_policy="Never",
861865
priority_class_name=self.priority_class_name,
862866
volumes=self.volumes,
867+
active_deadline_seconds=self.active_deadline_seconds,
863868
),
864869
)
865870

kubernetes_tests/test_kubernetes_pod_operator.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import pendulum
3131
import pytest
32+
from kubernetes import client
3233
from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s
3334
from kubernetes.client.api_client import ApiClient
3435
from kubernetes.client.rest import ApiException
@@ -43,6 +44,7 @@
4344
from airflow.utils.context import Context
4445
from airflow.utils.types import DagRunType
4546
from airflow.version import version as airflow_version
47+
from kubernetes_tests.test_base import BaseK8STest
4648

4749
HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
4850
POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
@@ -1331,3 +1333,32 @@ def __getattr__(self, name):
13311333
task.render_template_fields(context=context)
13321334
assert "password" in caplog.text
13331335
assert "secretpassword" not in caplog.text
1336+
1337+
1338+
class TestKubernetesPodOperator(BaseK8STest):
1339+
@pytest.mark.parametrize("active_deadline_seconds", [10, 20])
1340+
def test_kubernetes_pod_operator_active_deadline_seconds(self, active_deadline_seconds):
1341+
k = KubernetesPodOperator(
1342+
task_id=f"test_task_{active_deadline_seconds}",
1343+
active_deadline_seconds=active_deadline_seconds,
1344+
image="busybox",
1345+
cmds=["sh", "-c", "echo 'hello world' && sleep 60"],
1346+
namespace="default",
1347+
on_finish_action="keep_pod",
1348+
)
1349+
1350+
context = create_context(k)
1351+
1352+
with pytest.raises(AirflowException):
1353+
k.execute(context)
1354+
1355+
pod = k.find_pod("default", context, exclude_checked=False)
1356+
1357+
k8s_client = client.CoreV1Api()
1358+
1359+
pod_status = k8s_client.read_namespaced_pod_status(name=pod.metadata.name, namespace="default")
1360+
phase = pod_status.status.phase
1361+
reason = pod_status.status.reason
1362+
1363+
assert phase == "Failed"
1364+
assert reason == "DeadlineExceeded"

0 commit comments

Comments
 (0)