Skip to content

Commit e507e85

Browse files
committed
Add Airflow version in the Slack report for master DAG run
Use a BashOperator to get the airflow version and pushes the result to XCOM. Pass `context` to Slack report task so that it can pull the airflow version from XCOM pushed by the BashOperator.
1 parent 782bf34 commit e507e85

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

.circleci/integration-tests/master_dag.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
import time
44
from datetime import datetime
5-
from typing import List
5+
from typing import Any, List
66

77
from airflow import DAG
88
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
@@ -19,7 +19,7 @@
1919
SLACK_USERNAME = os.getenv("SLACK_USERNAME", "airflow_app")
2020

2121

22-
def get_report(dag_run_ids: List[str]) -> None:
22+
def get_report(dag_run_ids: List[str], **context: Any) -> None:
2323
"""Fetch dags run details and generate report"""
2424
with create_session() as session:
2525
last_dags_runs: List[DagRun] = session.query(DagRun).filter(DagRun.run_id.in_(dag_run_ids)).all()
@@ -39,6 +39,10 @@ def get_report(dag_run_ids: List[str]) -> None:
3939
task_message_str = f"{task_code} {ti.task_id} : {ti.state} \n"
4040
message_list.append(task_message_str)
4141

42+
airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version")
43+
airflow_version_message = f"\n\nAirflow version for the run is `{airflow_version}`"
44+
message_list.append(airflow_version_message)
45+
4246
logging.info("%s", "".join(message_list))
4347
# Send dag run report on Slack
4448
try:
@@ -94,6 +98,10 @@ def prepare_dag_dependency(task_info, execution_time):
9498
task_id="list_installed_pip_packages", bash_command="pip freeze"
9599
)
96100

101+
get_airflow_version = BashOperator(
102+
task_id="get_airflow_version", bash_command="airflow version", do_xcom_push=True
103+
)
104+
97105
dag_run_ids = []
98106
# AWS S3 and Redshift DAG
99107
amazon_task_info = [
@@ -194,6 +202,7 @@ def prepare_dag_dependency(task_info, execution_time):
194202
python_callable=get_report,
195203
op_kwargs={"dag_run_ids": dag_run_ids},
196204
trigger_rule="all_done",
205+
provide_context=True,
197206
)
198207

199208
end = DummyOperator(
@@ -203,6 +212,7 @@ def prepare_dag_dependency(task_info, execution_time):
203212

204213
start >> [
205214
list_installed_pip_packages,
215+
get_airflow_version,
206216
amazon_trigger_tasks[0],
207217
emr_trigger_tasks[0],
208218
google_trigger_tasks[0],
@@ -218,6 +228,7 @@ def prepare_dag_dependency(task_info, execution_time):
218228

219229
last_task = [
220230
list_installed_pip_packages,
231+
get_airflow_version,
221232
amazon_trigger_tasks[-1],
222233
emr_trigger_tasks[-1],
223234
google_trigger_tasks[-1],

0 commit comments

Comments
 (0)