Skip to content

Commit 23d5076

Browse files
authored
openlineage: defensively check for provided datetimes in listener (#33343)
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 699b3f4 commit 23d5076

File tree

2 files changed

+23
-9
lines changed

2 files changed

+23
-9
lines changed

airflow/providers/openlineage/plugins/adapter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ def start_task(
127127
parent_job_name: str | None,
128128
parent_run_id: str | None,
129129
code_location: str | None,
130-
nominal_start_time: str,
131-
nominal_end_time: str,
130+
nominal_start_time: str | None,
131+
nominal_end_time: str | None,
132132
owners: list[str],
133133
task: OperatorLineage | None,
134134
run_facets: dict[str, BaseFacet] | None = None, # Custom run facets

airflow/providers/openlineage/plugins/listener.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import logging
2020
from concurrent.futures import Executor, ThreadPoolExecutor
21+
from datetime import datetime
2122
from typing import TYPE_CHECKING
2223

2324
from airflow.listeners import hookimpl
@@ -76,16 +77,22 @@ def on_running():
7677

7778
task_metadata = self.extractor_manager.extract_metadata(dagrun, task)
7879

80+
start_date = task_instance.start_date if task_instance.start_date else datetime.now()
81+
data_interval_start = (
82+
dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None
83+
)
84+
data_interval_end = dagrun.data_interval_end.isoformat() if dagrun.data_interval_end else None
85+
7986
self.adapter.start_task(
8087
run_id=task_uuid,
8188
job_name=get_job_name(task),
8289
job_description=dag.description,
83-
event_time=task_instance.start_date.isoformat(),
90+
event_time=start_date.isoformat(),
8491
parent_job_name=dag.dag_id,
8592
parent_run_id=parent_run_id,
8693
code_location=None,
87-
nominal_start_time=dagrun.data_interval_start.isoformat(),
88-
nominal_end_time=dagrun.data_interval_end.isoformat(),
94+
nominal_start_time=data_interval_start,
95+
nominal_end_time=data_interval_end,
8996
owners=dag.owner.split(", "),
9097
task=task_metadata,
9198
run_facets={
@@ -113,10 +120,13 @@ def on_success():
113120
task_metadata = self.extractor_manager.extract_metadata(
114121
dagrun, task, complete=True, task_instance=task_instance
115122
)
123+
124+
end_date = task_instance.end_date if task_instance.end_date else datetime.now()
125+
116126
self.adapter.complete_task(
117127
run_id=task_uuid,
118128
job_name=get_job_name(task),
119-
end_time=task_instance.end_date.isoformat(),
129+
end_time=end_date.isoformat(),
120130
task=task_metadata,
121131
)
122132

@@ -139,10 +149,12 @@ def on_failure():
139149
dagrun, task, complete=True, task_instance=task_instance
140150
)
141151

152+
end_date = task_instance.end_date if task_instance.end_date else datetime.now()
153+
142154
self.adapter.fail_task(
143155
run_id=task_uuid,
144156
job_name=get_job_name(task),
145-
end_time=task_instance.end_date.isoformat(),
157+
end_time=end_date.isoformat(),
146158
task=task_metadata,
147159
)
148160

@@ -165,12 +177,14 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):
165177
if not self.executor:
166178
self.log.error("Executor have not started before `on_dag_run_running`")
167179
return
180+
data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
181+
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
168182
self.executor.submit(
169183
self.adapter.dag_started,
170184
dag_run=dag_run,
171185
msg=msg,
172-
nominal_start_time=dag_run.data_interval_start.isoformat(),
173-
nominal_end_time=dag_run.data_interval_end.isoformat(),
186+
nominal_start_time=data_interval_start,
187+
nominal_end_time=data_interval_end,
174188
)
175189

176190
@hookimpl

0 commit comments

Comments
 (0)