Skip to content

Commit 4eb9ffe

Browse files
authored
Optimize example DAG for EmrJobFlowSensorAsync and EmrStepSensorAsync (#203)
1 parent 67d1ac5 commit 4eb9ffe

File tree

2 files changed

+16
-91
lines changed

2 files changed

+16
-91
lines changed

astronomer/providers/amazon/aws/example_dags/example_emr_job_flow.py

Lines changed: 0 additions & 82 deletions
This file was deleted.

astronomer/providers/amazon/aws/example_dags/example_emr_step_sensor.py renamed to astronomer/providers/amazon/aws/example_dags/example_emr_sensor.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
from datetime import datetime, timedelta
55

66
from airflow import DAG
7-
from airflow.models.baseoperator import chain
87
from airflow.providers.amazon.aws.operators.emr import (
98
EmrAddStepsOperator,
109
EmrCreateJobFlowOperator,
1110
EmrTerminateJobFlowOperator,
1211
)
1312

14-
from astronomer.providers.amazon.aws.sensors.emr import EmrStepSensorAsync
13+
from astronomer.providers.amazon.aws.sensors.emr import (
14+
EmrJobFlowSensorAsync,
15+
EmrStepSensorAsync,
16+
)
1517

1618
JOB_FLOW_ROLE = os.getenv("EMR_JOB_FLOW_ROLE", "EMR_EC2_DefaultRole")
1719
SERVICE_ROLE = os.getenv("EMR_SERVICE_ROLE", "EMR_DefaultRole")
@@ -55,20 +57,22 @@
5557
"execution_timeout": timedelta(minutes=30),
5658
}
5759

60+
5861
with DAG(
59-
dag_id="example_emr_step_sensor",
62+
dag_id="example_emr_sensor",
6063
schedule_interval=None,
6164
start_date=datetime(2022, 1, 1),
6265
default_args=DEFAULT_ARGS,
6366
tags=["example", "async", "emr"],
6467
catchup=False,
6568
) as dag:
66-
69+
# [START howto_operator_emr_create_job_flow_steps_tasks]
6770
cluster_creator = EmrCreateJobFlowOperator(
6871
task_id="create_job_flow",
6972
job_flow_overrides=JOB_FLOW_OVERRIDES,
7073
emr_conn_id=EMR_CONN_ID,
7174
)
75+
# [END howto_operator_emr_create_job_flow_steps_tasks]
7276

7377
# [START howto_operator_emr_add_steps]
7478
step_adder = EmrAddStepsOperator(
@@ -79,6 +83,12 @@
7983
)
8084
# [END howto_operator_emr_add_steps]
8185

86+
# [START howto_sensor_emr_job_flow_sensor_async]
87+
job_flow_sensor = EmrJobFlowSensorAsync(
88+
task_id="job_flow_sensor", job_flow_id=cluster_creator.output, aws_conn_id=AWS_CONN_ID
89+
)
90+
# [END howto_sensor_emr_job_flow_sensor_async]
91+
8292
# [START howto_sensor_emr_step_sensor_async]
8393
"""
8494
Defer and poll until it reaches the target state
@@ -103,8 +113,5 @@
103113
)
104114
# [END howto_operator_emr_terminate_job_flow]
105115

106-
chain(
107-
step_adder,
108-
step_checker,
109-
cluster_remover,
110-
)
116+
[job_flow_sensor, step_checker] >> cluster_remover
117+
step_adder >> step_checker

0 commit comments

Comments
 (0)