Skip to content

Add back invalid inlet and outlet check before running tasks #50773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented May 19, 2025

Why

Invalid inlets and outlets (assets with conflicting names or URIs) were removed during the recent update to the task run.

The original issue #50654 occurred because the EmptyOperator skipped the task instance run, which led to an attempt to update the asset event in ti_update_state even if it came with invalid inlets or outlets. This caused the AirflowInactiveAssetInInletOrOutletException to be raised, resulting in the API server crashing.

What

  1. Reintroduce the inlets and outlets along with the validation logic, and implement it using a task SDK approach. Additionally, perform checks for EmptyOperators that have inlets or outlets.
  2. Implement a guard for the AirflowInactiveAssetInInletOrOutletException in ti_update_state to protect against the lesser-seen case where the asset becomes inactive during the execution of the task.
    closes: Task is taking more than 5 minutes to fail #50654

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@Lee-W Lee-W force-pushed the raise-asset-inactive-warning-even-earlier branch 3 times, most recently from 128b311 to ba7c5a5 Compare May 26, 2025 12:56
@Lee-W Lee-W changed the title feat: fail a task in scheduling phase if there's an inactive inlet or outlet Add back invalid inlet and outlet check before running tasks May 26, 2025
@Lee-W
Copy link
Member Author

Lee-W commented May 26, 2025

The logic is ready, but I still need some test cases and code polish. I would appreciate an early review. If not, I think I'll be able to wrap it up tomorrow

@Lee-W Lee-W force-pushed the raise-asset-inactive-warning-even-earlier branch from ba7c5a5 to 426f6ac Compare May 27, 2025 08:29
@uranusjr
Copy link
Member

This does not explain why the task takes five minutes to fail. It avoids the task to run altogether, but masks the underlying issue (still unknown) why the task fails so slowly.

@Lee-W
Copy link
Member Author

Lee-W commented May 27, 2025

This does not explain why the task takes five minutes to fail. It avoids the task to run altogether, but masks the underlying issue (still unknown) why the task fails so slowly.

The task execution was skipped as it's an empty operator, and the checking at a later stage in the task SDK was not handled, which breaks the API server. I think why it fails slow is due to the broken API server

@Lee-W Lee-W force-pushed the raise-asset-inactive-warning-even-earlier branch 2 times, most recently from 02cdfa4 to bf285d0 Compare May 27, 2025 13:50
@Lee-W Lee-W marked this pull request as ready for review May 27, 2025 14:18
@Lee-W
Copy link
Member Author

Lee-W commented May 27, 2025

This does not explain why the task takes five minutes to fail. It avoids the task to run altogether, but masks the underlying issue (still unknown) why the task fails so slowly.

The task execution was skipped as it's an empty operator, and the checking at a later stage in the task SDK was not handled, which breaks the API server. I think why it fails slow is due to the broken API server

This is now guarded as well in case assets somehow become inactive during task execution

@Lee-W Lee-W force-pushed the raise-asset-inactive-warning-even-earlier branch 3 times, most recently from 9bd8a66 to 196a121 Compare May 29, 2025 08:52
@vatsrahul1001 vatsrahul1001 added this to the Airflow 3.0.2 milestone Jun 1, 2025
@Lee-W Lee-W force-pushed the raise-asset-inactive-warning-even-earlier branch 5 times, most recently from f0edff8 to 81b000a Compare June 3, 2025 02:21
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking almost fine, some comments

@Lee-W Lee-W force-pushed the raise-asset-inactive-warning-even-earlier branch from 4ee2f65 to 05b2bb8 Compare June 3, 2025 06:46
@Lee-W Lee-W requested a review from amoghrajesh June 3, 2025 06:46
@Lee-W
Copy link
Member Author

Lee-W commented Jun 3, 2025

@amoghrajesh addressed all the comments. Could you please check again? Thanks!

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1
@kaxil WDYT?

@Lee-W
Copy link
Member Author

Lee-W commented Jun 3, 2025

As it has been approved and we're about to start work on releasing 3.0.2, I will merge this one.

@Lee-W Lee-W merged commit 083e03a into apache:main Jun 3, 2025
97 checks passed
@Lee-W Lee-W deleted the raise-asset-inactive-warning-even-earlier branch June 3, 2025 08:35
kaxil pushed a commit that referenced this pull request Jun 3, 2025
* feat(task-sdk): check invalid inlets or outlets before running tasks

* test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets

* test(task_instances): add test cases TestInvalidInletsAndOutlets

* Revert "test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets"

This reverts commit 5f6956d.

* feat(task_runner): early inlets and outlets check

* test(task_runner): fix asset inlet outlets tests

* test(asset): add test cases to AssetUniqueKey

* fix(task_instances): guard AirflowInactiveAssetInInletOrOutletException in ti_update_state

As we already check before scheduling, it's should normally not happen. Unless the asset become invalid after task succeeded,
which is not something expected to happen

* refactor: replace invalid with inactive

* refactor(task_instance): update exception

* test(task_runner): improve mocking check

* test(supervisor): improve test_handle_requests

(cherry picked from commit 083e03a)
kaxil pushed a commit that referenced this pull request Jun 3, 2025
* feat(task-sdk): check invalid inlets or outlets before running tasks

* test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets

* test(task_instances): add test cases TestInvalidInletsAndOutlets

* Revert "test(pytest_plugin): extend mock_supervisor_comms to ignore invalid assets in inlets and outlets"

This reverts commit 5f6956d.

* feat(task_runner): early inlets and outlets check

* test(task_runner): fix asset inlet outlets tests

* test(asset): add test cases to AssetUniqueKey

* fix(task_instances): guard AirflowInactiveAssetInInletOrOutletException in ti_update_state

As we already check before scheduling, it's should normally not happen. Unless the asset become invalid after task succeeded,
which is not something expected to happen

* refactor: replace invalid with inactive

* refactor(task_instance): update exception

* test(task_runner): improve mocking check

* test(supervisor): improve test_handle_requests

(cherry picked from commit 083e03a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task is taking more than 5 minutes to fail
5 participants