Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.10.5
What happened?
When a worker terminates abruptly, task instances (corresponding LocalTaskJobs) on that worker stops producing heartbeats. Those TIs are then found and purged as task instances without heartbeats (zombie). The scheduler finds these TIs every 10 seconds by default (configured by zombie_detection_interval).
The issue manifests when the scheduler finds the same TI at future intervals, usually the next interval, before the initial purge completes. To be more specific, before the task is marked as any state other than running
which is one of the conditions of being found as a TI without heartbeats. This behaviour was described in #32289 as well.
This happened to me when a TI with on_failure_callback and retries was found as TI without heartbeats and marked as up_for_retry
. However, before the task was marked as up_for_retry
, it was found as a TI without heartbeats again. Now there are TWO callbacks requested. The TI then ran the next attempt but was marked as failed
mid-execution.
This is a simplified version of the timeline for the above scenario.
- task starts 1st attempt but command failed [TI state:
running
] - task found as a TI without heartbeat and scheduler send out a callback request [time x] [TI state:
running
] - task found as a zombie and scheduler send out a callback request [time x+10] [TI state:
running
] - task callback request processed [time x+11] [TI state:
up_for_retry
] - task starts 2nd attempt [TI state:
running
] - task callback request processed and downstream marked as
upstream_failed
[TI state:failed
] - task finishes (and due to task logic) marked as skipped [TI state:
skipped
]
What you think should happen instead?
The scheduler sends the callback request to the DagFileProcessorManager through the executor. In Airflow 2.10.5, all callback requests sent this way is queued.
I think the issue is that the scheduler does not know whether a callback request for the same task attempt has been sent before and whether the request has completed. Without the information, the scheduler potentially send out more callbacks than necessary.
How to reproduce
I was able to reproduce the issue with the instructions in #32289. I tried a similar DAG and the issue is reproducible.
Here is the Airflow configuration to help reproduce the issue.
ENV AIRFLOW__CORE__DAG_DIR_LIST_INTERVAL=5
ENV AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=120
ENV AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=150
ENV AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=10
ENV AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=10
Here is the DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
import time
import logging
from datetime import datetime, timedelta
time.sleep(15)
def sleep(**context):
time.sleep(600)
def alert(context):
logging.info(f"{context['task_instance'].task_id} failed")
with DAG(
"my_dag",
schedule_interval="@daily",
start_date=datetime(2025, 1, 1),
max_active_runs=1,
catchup=False
) as dag:
task = PythonOperator(
task_id="test",
python_callable=sleep,
retries=0,
# retry quickly to ensure the task will be successful when the second clean up occurs
retry_delay=timedelta(seconds=1),
on_failure_callback=alert
)
First, let the task run. The task is assigned to the worker critical-eccentricity-9882-worker-default-55bdb445b8-7bb92
.
[2025-06-20T00:40:40.407+0000] {scheduler_job_runner.py:803} INFO - Setting external_id for <TaskInstance: my_dag.test scheduled__2025-06-19T00:00:00+00:00 [queued]> to ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08
...
[2025-06-20 00:40:40,407: INFO/ForkPoolWorker-6] [ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08] Executing command in Celery: ['airflow', 'tasks', 'run', 'my_dag', 'test', 'scheduled__2025-06-19T00:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/zombie.py']
Then kill the worker while the TI is executing. In this case, I abruptly deleted the worker at 2025-06-20T00:41:05
.
alan@Alans-MacBook-Pro test % kubectl delete pod critical-eccentricity-9882-worker-default-55bdb445b8-7bb92 --grace-period=0
pod "critical-eccentricity-9882-worker-default-55bdb445b8-7bb92" deleted
The scheduler detected the lack of heartbeat from the TI (LocalTaskJob). This is the FIRST time.
[2025-06-20T00:41:18.162+0000] {scheduler_job_runner.py:2086} WARNING - Failing (1) jobs without heartbeat after 2025-06-20 00:41:08.156992+00:00
[2025-06-20T00:41:18.162+0000] {scheduler_job_runner.py:2110} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/zombie.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'my_dag', 'Task Id': 'test', 'Run Id': 'scheduled__2025-06-19T00:00:00+00:00', 'Hostname': '172.20.12.44', 'External Executor Id': 'ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08'}", 'simple_task_instance': SimpleTaskInstance(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', map_index=-1, start_date=datetime.datetime(2025, 6, 20, 0, 40, 55, 552863, tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', executor=None, executor_config={}, run_as_user=None, pool='default_pool', priority_weight=1, queue='default', key=TaskInstanceKey(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', try_number=1, map_index=-1)), 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
The scheduler found the TI as a TI without heartbeat (zombie) again. This is the SECOND time.
[2025-06-20T00:41:28.230+0000] {scheduler_job_runner.py:2086} WARNING - Failing (1) jobs without heartbeat after 2025-06-20 00:41:18.224929+00:00
[2025-06-20T00:41:28.230+0000] {scheduler_job_runner.py:2110} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/zombie.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'my_dag', 'Task Id': 'test', 'Run Id': 'scheduled__2025-06-19T00:00:00+00:00', 'Hostname': '172.20.12.44', 'External Executor Id': 'ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08'}", 'simple_task_instance': SimpleTaskInstance(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', map_index=-1, start_date=datetime.datetime(2025, 6, 20, 0, 40, 55, 552863, tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', executor=None, executor_config={}, run_as_user=None, pool='default_pool', priority_weight=1, queue='default', key=TaskInstanceKey(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', try_number=1, map_index=-1)), 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-06-20T00:41:28.898+0000] {scheduler_job_runner.py:813} INFO - TaskInstance Finished: dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, map_index=-1, run_start_date=2025-06-20 00:40:55.552863+00:00, run_end_date=None, run_duration=None, state=running, executor=CeleryExecutor(parallelism=100), executor_state=failed, try_number=1, max_tries=0, job_id=185, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2025-06-20 00:40:40.273765+00:00, queued_by_job_id=183, pid=59
A DagFileProcessing process began processing the source file /usr/local/airflow/dags/zombie.py
.
[2025-06-20T00:41:28.253+0000] [SOURCE:DAG_PROCESSOR] {processor.py:914} INFO - Processing file /usr/local/airflow/dags/zombie.py for tasks to queue
[2025-06-20T00:41:28.255+0000] [SOURCE:DAG_PROCESSOR] {dagbag.py:588} INFO - Filling up the DagBag from /usr/local/airflow/dags/zombie.py
The scheduler found the TI as a TI without heartbeat (zombie) again. This is the THIRD time.
[2025-06-20T00:41:38.250+0000] {scheduler_job_runner.py:2086} WARNING - Failing (1) jobs without heartbeat after 2025-06-20 00:41:28.245918+00:00
[2025-06-20T00:41:38.251+0000] {scheduler_job_runner.py:2110} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/zombie.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'my_dag', 'Task Id': 'test', 'Run Id': 'scheduled__2025-06-19T00:00:00+00:00', 'Hostname': '172.20.12.44', 'External Executor Id': 'ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08'}", 'simple_task_instance': SimpleTaskInstance(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', map_index=-1, start_date=datetime.datetime(2025, 6, 20, 0, 40, 55, 552863, tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', executor=None, executor_config={}, run_as_user=None, pool='default_pool', priority_weight=1, queue='default', key=TaskInstanceKey(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', try_number=1, map_index=-1)), 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-06-20T00:41:39.052+0000] {scheduler_job_runner.py:813} INFO - TaskInstance Finished: dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, map_index=-1, run_start_date=2025-06-20 00:40:55.552863+00:00, run_end_date=None, run_duration=None, state=running, executor=CeleryExecutor(parallelism=100), executor_state=failed, try_number=1, max_tries=0, job_id=185, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2025-06-20 00:40:40.273765+00:00, queued_by_job_id=183, pid=59
Processing finished and the process executed the callback request from the first and second zombie purging attempts.
[2025-06-20T00:41:43.261+0000] [SOURCE:DAG_PROCESSOR] {processor.py:925} INFO - DAG(s) 'my_dag' retrieved from /usr/local/airflow/dags/zombie.py
[2025-06-20T00:41:43.406+0000] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, execution_date=20250619T000000, start_date=20250620T004055, end_date=20250620T004143
[2025-06-20T00:41:43.406+0000] {taskinstance.py:1564} INFO - Executing callback at index 0: alert
[2025-06-20T00:41:43.517+0000] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, execution_date=20250619T000000, start_date=20250620T004055, end_date=20250620T004143
[2025-06-20T00:41:43.517+0000] {taskinstance.py:1564} INFO - Executing callback at index 0: alert
[2025-06-20T00:41:43.599+0000] [SOURCE:DAG_PROCESSOR] {processor.py:208} INFO - Processing /usr/local/airflow/dags/zombie.py took 15.351 seconds
A DagFileProcessing process began processing the source file /usr/local/airflow/dags/zombie.py
.
[2025-06-20T00:41:53.832+0000] [SOURCE:DAG_PROCESSOR] {processor.py:186} INFO - Started process (PID=149) to work on /usr/local/airflow/dags/zombie.py
[2025-06-20T00:41:53.833+0000] [SOURCE:DAG_PROCESSOR] {processor.py:914} INFO - Processing file /usr/local/airflow/dags/zombie.py for tasks to queue
Processing finishes and executes the callback request from the third zombie purging attempts.
[2025-06-20T00:42:08.840+0000] [SOURCE:DAG_PROCESSOR] {processor.py:925} INFO - DAG(s) 'my_dag' retrieved from /usr/local/airflow/dags/zombie.py
[2025-06-20T00:42:08.956+0000] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, execution_date=20250619T000000, start_date=20250620T004055, end_date=20250620T004208
[2025-06-20T00:42:08.956+0000] {taskinstance.py:1564} INFO - Executing callback at index 0: alert
There should NOT be three instances of on_failure_callbacks
executions for one attempt of the same task instance.
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct