Skip to content

Commit e729a2e

Browse files
github-actions[bot]jason810496
authored andcommitted
Fix k8s test: scheduler crash when using LocalExecutor (#49677) (#49718)
(cherry picked from commit 2372dcd) Co-authored-by: LIU ZHE YOU <[email protected]>
1 parent ab47e7e commit e729a2e

File tree

3 files changed

+29
-16
lines changed

3 files changed

+29
-16
lines changed

kubernetes-tests/tests/kubernetes_tests/test_base.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from datetime import datetime, timezone
2525
from pathlib import Path
2626
from subprocess import check_call, check_output
27+
from typing import Literal
2728

2829
import pytest
2930
import requests
@@ -65,7 +66,7 @@ def base_tests_setup(self, request):
6566
# Replacement for unittests.TestCase.id()
6667
self.test_id = f"{request.node.cls.__name__}_{request.node.name}"
6768
# Ensure the api-server deployment is healthy at kubernetes level before calling the any API
68-
self.ensure_deployment_health("airflow-api-server")
69+
self.ensure_resource_health("airflow-api-server")
6970
try:
7071
self.session = self._get_session_with_retries()
7172
self._ensure_airflow_api_server_is_healthy()
@@ -227,12 +228,24 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state,
227228
assert state == expected_final_state
228229

229230
@staticmethod
230-
def ensure_deployment_health(deployment_name: str, namespace: str = "airflow"):
231-
"""Watch the deployment until it is healthy."""
232-
deployment_rollout_status = check_output(
233-
["kubectl", "rollout", "status", "deployment", deployment_name, "-n", namespace, "--watch"]
231+
def ensure_resource_health(
232+
resource_name: str,
233+
namespace: str = "airflow",
234+
resource_type: Literal["deployment", "statefulset"] = "deployment",
235+
):
236+
"""Watch the resource until it is healthy.
237+
Args:
238+
resource_name (str): Name of the resource to check.
239+
resource_type (str): Type of the resource (e.g., deployment, statefulset).
240+
namespace (str): Kubernetes namespace where the resource is located.
241+
"""
242+
rollout_status = check_output(
243+
["kubectl", "rollout", "status", f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
234244
).decode()
235-
assert "successfully rolled out" in deployment_rollout_status
245+
if resource_type == "deployment":
246+
assert "successfully rolled out" in rollout_status
247+
else:
248+
assert "roll out complete" in rollout_status
236249

237250
def ensure_dag_expected_state(self, host, logical_date, dag_id, expected_final_state, timeout):
238251
tries = 0

kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def test_integration_run_dag_with_scheduler_failure(self):
8181
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)
8282

8383
self._delete_airflow_pod("scheduler")
84-
self.ensure_deployment_health("airflow-scheduler")
84+
self.ensure_resource_health("airflow-scheduler")
8585

8686
# Wait some time for the operator to complete
8787
self.monitor_task(

kubernetes-tests/tests/kubernetes_tests/test_other_executors.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@
2929
# Also, the skipping is necessary as there's no gain in running these tests in KubernetesExecutor
3030
@pytest.mark.skipif(EXECUTOR == "KubernetesExecutor", reason="Does not run on KubernetesExecutor")
3131
class TestCeleryAndLocalExecutor(BaseK8STest):
32-
@pytest.mark.xfail(
33-
EXECUTOR == "LocalExecutor",
34-
reason="https://github.com/apache/airflow/issues/47518 needs to be fixed",
35-
)
3632
def test_integration_run_dag(self):
3733
dag_id = "example_bash_operator"
3834
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)
@@ -56,17 +52,21 @@ def test_integration_run_dag(self):
5652
timeout=300,
5753
)
5854

59-
@pytest.mark.xfail(
60-
EXECUTOR == "LocalExecutor",
61-
reason="https://github.com/apache/airflow/issues/47518 needs to be fixed",
62-
)
6355
def test_integration_run_dag_with_scheduler_failure(self):
6456
dag_id = "example_xcom"
6557

6658
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)
6759

6860
self._delete_airflow_pod("scheduler")
69-
self.ensure_deployment_health("airflow-scheduler")
61+
62+
# Wait for the scheduler to be recreated
63+
if EXECUTOR == "CeleryExecutor":
64+
scheduler_resource_type = "deployment"
65+
elif EXECUTOR == "LocalExecutor":
66+
scheduler_resource_type = "statefulset"
67+
else:
68+
raise ValueError(f"Unknown executor {EXECUTOR}")
69+
self.ensure_resource_health("airflow-scheduler", resource_type=scheduler_resource_type)
7070

7171
# Wait some time for the operator to complete
7272
self.monitor_task(

0 commit comments

Comments
 (0)