-
Notifications
You must be signed in to change notification settings - Fork 15.1k
Add back invalid inlet and outlet check before running tasks #50773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add back invalid inlet and outlet check before running tasks #50773
Conversation
128b311
to
ba7c5a5
Compare
The logic is ready, but I still need some test cases and code polish. I would appreciate an early review. If not, I think I'll be able to wrap it up tomorrow |
ba7c5a5
to
426f6ac
Compare
This does not explain why the task takes five minutes to fail. It avoids the task to run altogether, but masks the underlying issue (still unknown) why the task fails so slowly. |
The task execution was skipped as it's an empty operator, and the checking at a later stage in the task SDK was not handled, which breaks the API server. I think why it fails slow is due to the broken API server |
02cdfa4
to
bf285d0
Compare
This is now guarded as well in case assets somehow become inactive during task execution |
9bd8a66
to
196a121
Compare
f0edff8
to
81b000a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking almost fine, some comments
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Outdated
Show resolved
Hide resolved
…ssets in inlets and outlets
…nvalid assets in inlets and outlets" This reverts commit 5f6956d.
…on in ti_update_state As we already check before scheduling, it's should normally not happen. Unless the asset become invalid after task succeeded, which is not something expected to happen
4ee2f65
to
05b2bb8
Compare
@amoghrajesh addressed all the comments. Could you please check again? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
@kaxil WDYT?
As it has been approved and we're about to start work on releasing 3.0.2, I will merge this one. |
* feat(task-sdk): check invalid inlets or outlets before running tasks * test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets * test(task_instances): add test cases TestInvalidInletsAndOutlets * Revert "test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets" This reverts commit 5f6956d. * feat(task_runner): early inlets and outlets check * test(task_runner): fix asset inlet outlets tests * test(asset): add test cases to AssetUniqueKey * fix(task_instances): guard AirflowInactiveAssetInInletOrOutletException in ti_update_state As we already check before scheduling, it's should normally not happen. Unless the asset become invalid after task succeeded, which is not something expected to happen * refactor: replace invalid with inactive * refactor(task_instance): update exception * test(task_runner): improve mocking check * test(supervisor): improve test_handle_requests (cherry picked from commit 083e03a)
* feat(task-sdk): check invalid inlets or outlets before running tasks * test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets * test(task_instances): add test cases TestInvalidInletsAndOutlets * Revert "test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets" This reverts commit 5f6956d. * feat(task_runner): early inlets and outlets check * test(task_runner): fix asset inlet outlets tests * test(asset): add test cases to AssetUniqueKey * fix(task_instances): guard AirflowInactiveAssetInInletOrOutletException in ti_update_state As we already check before scheduling, it's should normally not happen. Unless the asset become invalid after task succeeded, which is not something expected to happen * refactor: replace invalid with inactive * refactor(task_instance): update exception * test(task_runner): improve mocking check * test(supervisor): improve test_handle_requests (cherry picked from commit 083e03a)
Why
Invalid inlets and outlets (assets with conflicting names or URIs) were removed during the recent update to the task run.
The original issue #50654 occurred because the EmptyOperator skipped the task instance run, which led to an attempt to update the asset event in
ti_update_state
even if it came with invalid inlets or outlets. This caused theAirflowInactiveAssetInInletOrOutletException
to be raised, resulting in the API server crashing.What
AirflowInactiveAssetInInletOrOutletException
inti_update_state
to protect against the lesser-seen case where the asset becomes inactive during the execution of the task.closes: Task is taking more than 5 minutes to fail #50654
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in airflow-core/newsfragments.