From e507e85f7f3ebe32527388d0d7f4fec56b93ef92 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 14 Jul 2022 17:35:16 +0530 Subject: [PATCH 1/3] Add Airflow version in the Slack report for master DAG run Use a BashOperator to get the airflow version and pushes the result to XCOM. Pass `context` to Slack report task so that it can pull the airflow version from XCOM pushed by the BashOperator. --- .circleci/integration-tests/master_dag.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/.circleci/integration-tests/master_dag.py b/.circleci/integration-tests/master_dag.py index 331826a4b..a96b29a21 100644 --- a/.circleci/integration-tests/master_dag.py +++ b/.circleci/integration-tests/master_dag.py @@ -2,7 +2,7 @@ import os import time from datetime import datetime -from typing import List +from typing import Any, List from airflow import DAG from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator @@ -19,7 +19,7 @@ SLACK_USERNAME = os.getenv("SLACK_USERNAME", "airflow_app") -def get_report(dag_run_ids: List[str]) -> None: +def get_report(dag_run_ids: List[str], **context: Any) -> None: """Fetch dags run details and generate report""" with create_session() as session: last_dags_runs: List[DagRun] = session.query(DagRun).filter(DagRun.run_id.in_(dag_run_ids)).all() @@ -39,6 +39,10 @@ def get_report(dag_run_ids: List[str]) -> None: task_message_str = f"{task_code} {ti.task_id} : {ti.state} \n" message_list.append(task_message_str) + airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version") + airflow_version_message = f"\n\nAirflow version for the run is `{airflow_version}`" + message_list.append(airflow_version_message) + logging.info("%s", "".join(message_list)) # Send dag run report on Slack try: @@ -94,6 +98,10 @@ def prepare_dag_dependency(task_info, execution_time): task_id="list_installed_pip_packages", bash_command="pip freeze" ) + get_airflow_version = BashOperator( + task_id="get_airflow_version", bash_command="airflow version", do_xcom_push=True + ) + dag_run_ids = [] # AWS S3 and Redshift DAG amazon_task_info = [ @@ -194,6 +202,7 @@ def prepare_dag_dependency(task_info, execution_time): python_callable=get_report, op_kwargs={"dag_run_ids": dag_run_ids}, trigger_rule="all_done", + provide_context=True, ) end = DummyOperator( @@ -203,6 +212,7 @@ def prepare_dag_dependency(task_info, execution_time): start >> [ list_installed_pip_packages, + get_airflow_version, amazon_trigger_tasks[0], emr_trigger_tasks[0], google_trigger_tasks[0], @@ -218,6 +228,7 @@ def prepare_dag_dependency(task_info, execution_time): last_task = [ list_installed_pip_packages, + get_airflow_version, amazon_trigger_tasks[-1], emr_trigger_tasks[-1], google_trigger_tasks[-1], From 87010f497da1f6dfd99597fd53f884a507743a9c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 14 Jul 2022 18:24:26 +0530 Subject: [PATCH 2/3] Include Airflow version as first line of the report --- .circleci/integration-tests/master_dag.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.circleci/integration-tests/master_dag.py b/.circleci/integration-tests/master_dag.py index a96b29a21..7c6c1b218 100644 --- a/.circleci/integration-tests/master_dag.py +++ b/.circleci/integration-tests/master_dag.py @@ -24,6 +24,11 @@ def get_report(dag_run_ids: List[str], **context: Any) -> None: with create_session() as session: last_dags_runs: List[DagRun] = session.query(DagRun).filter(DagRun.run_id.in_(dag_run_ids)).all() message_list: List[str] = [] + + airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version") + airflow_version_message = f"Airflow version for the run is `{airflow_version}` \n\n" + message_list.append(airflow_version_message) + for dr in last_dags_runs: dr_status = f" *{dr.dag_id} : {dr.get_state()}* \n" message_list.append(dr_status) @@ -39,10 +44,6 @@ def get_report(dag_run_ids: List[str], **context: Any) -> None: task_message_str = f"{task_code} {ti.task_id} : {ti.state} \n" message_list.append(task_message_str) - airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version") - airflow_version_message = f"\n\nAirflow version for the run is `{airflow_version}`" - message_list.append(airflow_version_message) - logging.info("%s", "".join(message_list)) # Send dag run report on Slack try: From f9ca41f4d91cf1c498ac38fb409c62acf010aca7 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 14 Jul 2022 18:26:26 +0530 Subject: [PATCH 3/3] Update .circleci/integration-tests/master_dag.py --- .circleci/integration-tests/master_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/integration-tests/master_dag.py b/.circleci/integration-tests/master_dag.py index 7c6c1b218..bb7d2197f 100644 --- a/.circleci/integration-tests/master_dag.py +++ b/.circleci/integration-tests/master_dag.py @@ -26,7 +26,7 @@ def get_report(dag_run_ids: List[str], **context: Any) -> None: message_list: List[str] = [] airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version") - airflow_version_message = f"Airflow version for the run is `{airflow_version}` \n\n" + airflow_version_message = f"Airflow version for the below run is `{airflow_version}` \n\n" message_list.append(airflow_version_message) for dr in last_dags_runs: