Skip to content

respect "soft_fail" argument when running BatchSensor in deferrable mode #33405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions airflow/providers/amazon/aws/sensors/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.triggers.batch import BatchJobTrigger
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -115,7 +115,12 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event["status"] != "success":
raise AirflowException(f"Error while running job: {event}")
message = f"Error while running job: {event}"
# TODO: remove this if-else block when min_airflow_version is set to higher than the version that
# changed in https://github.com/apache/airflow/pull/33424 is released
Comment on lines +119 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if-else block are you referring to?

if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
job_id = event["job_id"]
self.log.info("Batch Job %s complete", job_id)

Expand Down
8 changes: 7 additions & 1 deletion tests/providers/amazon/aws/sensors/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import pytest

from airflow.exceptions import AirflowException, TaskDeferred
from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.sensors.batch import (
BatchComputeEnvironmentSensor,
Expand Down Expand Up @@ -100,6 +100,12 @@ def test_execute_failure_in_deferrable_mode(self, deferrable_batch_sensor: Batch
with pytest.raises(AirflowException):
deferrable_batch_sensor.execute_complete(context={}, event={"status": "failure"})

def test_execute_failure_in_deferrable_mode_with_soft_fail(self, deferrable_batch_sensor: BatchSensor):
"""Tests that an AirflowSkipException is raised in case of error event and soft_fail is set to True"""
deferrable_batch_sensor.soft_fail = True
with pytest.raises(AirflowSkipException):
deferrable_batch_sensor.execute_complete(context={}, event={"status": "failure"})


@pytest.fixture(scope="module")
def batch_compute_environment_sensor() -> BatchComputeEnvironmentSensor:
Expand Down