Skip to content

Commit 36cb4f3

Browse files
authored
bug fix: DateTimeSensor can't render jinja template if use native obj (apache#50744)
* fix bug: can't render jinja template if use native obj * re-order `from airflow.utils import timezone` * make `_moment` as a property * fix unit test * remove the unnecessary pendulum object
1 parent 374f46a commit 36cb4f3

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

providers/standard/src/airflow/providers/standard/sensors/date_time.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
2626
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
2727
from airflow.sensors.base import BaseSensorOperator
28+
from airflow.utils import timezone
2829

2930
try:
3031
from airflow.triggers.base import StartTriggerArgs
@@ -41,8 +42,6 @@ class StartTriggerArgs: # type: ignore[no-redef]
4142
timeout: datetime.timedelta | None = None
4243

4344

44-
from airflow.utils import timezone
45-
4645
if TYPE_CHECKING:
4746
try:
4847
from airflow.sdk.definitions.context import Context
@@ -99,6 +98,13 @@ def poke(self, context: Context) -> bool:
9998
self.log.info("Checking if the time (%s) has come", self.target_time)
10099
return timezone.utcnow() > timezone.parse(self.target_time)
101100

101+
@property
102+
def _moment(self) -> datetime.datetime:
103+
if isinstance(self.target_time, datetime.datetime):
104+
return self.target_time
105+
106+
return timezone.parse(self.target_time)
107+
102108

103109
class DateTimeSensorAsync(DateTimeSensor):
104110
"""
@@ -145,11 +151,11 @@ def execute(self, context: Context) -> NoReturn:
145151
self.defer(
146152
method_name="execute_complete",
147153
trigger=DateTimeTrigger(
148-
moment=timezone.parse(self.target_time),
154+
moment=self._moment,
149155
end_from_trigger=self.end_from_trigger,
150156
)
151157
if AIRFLOW_V_3_0_PLUS
152-
else DateTimeTrigger(moment=timezone.parse(self.target_time)),
158+
else DateTimeTrigger(moment=self._moment),
153159
)
154160

155161
def execute_complete(self, context: Context, event: Any = None) -> None:

providers/standard/tests/unit/standard/sensors/test_date_time.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
from unittest.mock import patch
2121

22+
import pendulum
2223
import pytest
2324

25+
from airflow import macros
2426
from airflow.models.dag import DAG
2527
from airflow.providers.standard.sensors.date_time import DateTimeSensor
2628
from airflow.utils import timezone
@@ -90,3 +92,34 @@ def test_invalid_input(self):
9092
def test_poke(self, mock_utcnow, task_id, target_time, expected):
9193
op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag)
9294
assert op.poke(None) == expected
95+
96+
@pytest.mark.parametrize(
97+
"native, target_time, expected_type",
98+
[
99+
(False, "2025-01-01T00:00:00+00:00", pendulum.DateTime),
100+
(True, "{{ data_interval_end }}", pendulum.DateTime),
101+
(False, pendulum.datetime(2025, 1, 1, tz="UTC"), pendulum.DateTime),
102+
],
103+
)
104+
def test_moment(self, native, target_time, expected_type):
105+
dag = DAG(
106+
dag_id="moment_dag",
107+
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
108+
schedule=None,
109+
render_template_as_native_obj=native,
110+
)
111+
112+
sensor = DateTimeSensor(
113+
task_id="moment",
114+
target_time=target_time,
115+
dag=dag,
116+
)
117+
118+
ctx = {
119+
"data_interval_end": pendulum.datetime(2025, 1, 1, tz="UTC"),
120+
"macros": macros,
121+
"dag": dag,
122+
}
123+
sensor.render_template_fields(ctx)
124+
125+
assert isinstance(sensor._moment, expected_type)

0 commit comments

Comments
 (0)