Skip to content

Commit f860141

Browse files
authored
respect "soft_fail" argument when running BatchSensor in deferrable mode (#33405)
1 parent f657863 commit f860141

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

airflow/providers/amazon/aws/sensors/batch.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from deprecated import deprecated
2424

2525
from airflow.configuration import conf
26-
from airflow.exceptions import AirflowException
26+
from airflow.exceptions import AirflowException, AirflowSkipException
2727
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
2828
from airflow.providers.amazon.aws.triggers.batch import BatchJobTrigger
2929
from airflow.sensors.base import BaseSensorOperator
@@ -115,7 +115,12 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
115115
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
116116
"""
117117
if event["status"] != "success":
118-
raise AirflowException(f"Error while running job: {event}")
118+
message = f"Error while running job: {event}"
119+
# TODO: remove this if-else block when min_airflow_version is set to higher than the version that
120+
# changed in https://github.com/apache/airflow/pull/33424 is released
121+
if self.soft_fail:
122+
raise AirflowSkipException(message)
123+
raise AirflowException(message)
119124
job_id = event["job_id"]
120125
self.log.info("Batch Job %s complete", job_id)
121126

tests/providers/amazon/aws/sensors/test_batch.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import pytest
2222

23-
from airflow.exceptions import AirflowException, TaskDeferred
23+
from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred
2424
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
2525
from airflow.providers.amazon.aws.sensors.batch import (
2626
BatchComputeEnvironmentSensor,
@@ -100,6 +100,12 @@ def test_execute_failure_in_deferrable_mode(self, deferrable_batch_sensor: Batch
100100
with pytest.raises(AirflowException):
101101
deferrable_batch_sensor.execute_complete(context={}, event={"status": "failure"})
102102

103+
def test_execute_failure_in_deferrable_mode_with_soft_fail(self, deferrable_batch_sensor: BatchSensor):
104+
"""Tests that an AirflowSkipException is raised in case of error event and soft_fail is set to True"""
105+
deferrable_batch_sensor.soft_fail = True
106+
with pytest.raises(AirflowSkipException):
107+
deferrable_batch_sensor.execute_complete(context={}, event={"status": "failure"})
108+
103109

104110
@pytest.fixture(scope="module")
105111
def batch_compute_environment_sensor() -> BatchComputeEnvironmentSensor:

0 commit comments

Comments
 (0)