Skip to content

Commit 59c8b7c

Browse files
Example DAG for EmrContainerSensorAsync(#242)
1 parent af9251c commit 59c8b7c

File tree

6 files changed

+222
-86
lines changed

6 files changed

+222
-86
lines changed

.circleci/integration-tests/Dockerfile

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,18 @@ RUN apt-get update -y \
1010
&& rm -rf /var/lib/apt/lists/*
1111

1212
# install AWS CLI
13-
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
14-
RUN unzip awscliv2.zip
15-
RUN ./aws/install
13+
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
14+
&& unzip awscliv2.zip \
15+
&& ./aws/install
16+
17+
# install eksctl
18+
RUN curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp \
19+
&& mv /tmp/eksctl /usr/local/bin
20+
21+
# install kubectl
22+
RUN curl -o kubectl https://s3.us-west-2.amazonaws.com/amazon-eks/1.22.6/2022-03-09/bin/linux/amd64/kubectl \
23+
&& chmod +x ./kubectl \
24+
&& mv ./kubectl /usr/local/bin
1625

1726
COPY astronomer-providers /tmp/astronomer-providers
1827
RUN pip install /tmp/astronomer-providers[all]
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#!/bin/bash
2+
3+
# create cluster
4+
eksctl create cluster \
5+
--name $EKS_CONTAINER_PROVIDER_CLUSTER_NAME \
6+
--region $AWS_DEFAULT_REGION \
7+
--with-oidc \
8+
--ssh-access \
9+
--ssh-public-key providers_team_keypair \
10+
--instance-types=$INSTANCE_TYPE \
11+
--managed
12+
13+
# create kubectl cluster namespace
14+
kubectl create namespace $KUBECTL_CLUSTER_NAME
15+
16+
eksctl create iamidentitymapping \
17+
--cluster $EKS_CONTAINER_PROVIDER_CLUSTER_NAME \
18+
--namespace $KUBECTL_CLUSTER_NAME \
19+
--service-name "emr-containers"
20+
21+
aws eks describe-cluster --name $EKS_CONTAINER_PROVIDER_CLUSTER_NAME --query "cluster.identity.oidc.issuer"
22+
23+
eksctl utils associate-iam-oidc-provider --cluster $EKS_CONTAINER_PROVIDER_CLUSTER_NAME --approve
24+
25+
aws iam create-role --role-name $JOB_EXECUTION_ROLE --assume-role-policy-document '{"Version": "2012-10-17","Statement":
26+
[{"Effect": "Allow","Principal": {"AWS": "arn:aws:iam::'$AWS_ACCOUNT_ID':root"},"Action":
27+
"sts:AssumeRole","Condition": {}}]}'
28+
29+
30+
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$DEBUGGING_MONITORING_POLICY
31+
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$CONTAINER_SUBMIT_JOB_POLICY
32+
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$JOB_EXECUTION_POLICY
33+
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$MANAGE_VIRTUAL_CLUSTERS
34+
35+
36+
aws emr-containers update-role-trust-policy \
37+
--cluster-name $EKS_CONTAINER_PROVIDER_CLUSTER_NAME \
38+
--namespace $KUBECTL_CLUSTER_NAME \
39+
--role-name $JOB_EXECUTION_ROLE
40+
41+
export JOB_ROLE_ARN="arn:aws:iam::"$AWS_ACCOUNT_ID":role/"$JOB_EXECUTION_ROLE
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/bin/bash
2+
3+
# cmd to delete the policy attached to the role
4+
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$DEBUGGING_MONITORING_POLICY
5+
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$CONTAINER_SUBMIT_JOB_POLICY
6+
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$JOB_EXECUTION_POLICY
7+
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$MANAGE_VIRTUAL_CLUSTERS
8+
9+
# cmd to delete the role attached to the cluster
10+
aws iam delete-role --role-name $JOB_EXECUTION_ROLE
11+
12+
# delete the virtual cluster
13+
aws emr-containers delete-virtual-cluster --id $VIRTUAL_CLUSTER_ID
14+
15+
# cmd to delete the EKS cluster and node group attached to it
16+
eksctl delete cluster $EKS_CONTAINER_PROVIDER_CLUSTER_NAME

astronomer/providers/amazon/aws/example_dags/example_emr.py

Lines changed: 0 additions & 82 deletions
This file was deleted.
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import logging
2+
import os
3+
from datetime import datetime, timedelta
4+
5+
import boto3
6+
from airflow import DAG
7+
from airflow.operators.bash import BashOperator
8+
from airflow.operators.python import PythonOperator
9+
from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
10+
from botocore.exceptions import ClientError
11+
12+
from astronomer.providers.amazon.aws.sensors.emr import EmrContainerSensorAsync
13+
14+
# [START howto_operator_emr_eks_env_variables]
15+
VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "xxxxxxxx")
16+
AWS_CONN_ID = os.getenv("ASTRO_AWS_CONN_ID", "aws_default")
17+
JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::121212121212:role/test_iam_job_execution_role")
18+
# [END howto_operator_emr_eks_env_variables]
19+
20+
# Job role name and policy name attached to the role
21+
JOB_EXECUTION_ROLE = os.getenv("JOB_EXECUTION_ROLE", "test_iam_job_execution_role")
22+
DEBUGGING_MONITORING_POLICY = os.getenv("DEBUGGING_MONITORING_POLICY", "test_debugging_monitoring_policy")
23+
CONTAINER_SUBMIT_JOB_POLICY = os.getenv(
24+
"CONTAINER_SUBMIT_JOB_POLICY", "test_emr_container_submit_jobs_policy"
25+
)
26+
JOB_EXECUTION_POLICY = os.getenv("JOB_EXECUTION_POLICY", "test_job_execution_policy")
27+
MANAGE_VIRTUAL_CLUSTERS = os.getenv("MANAGE_VIRTUAL_CLUSTERS", "test_manage_virtual_clusters")
28+
29+
EKS_CONTAINER_PROVIDER_CLUSTER_NAME = os.getenv(
30+
"EKS_CONTAINER_PROVIDER_CLUSTER_NAME", "providers-team-eks-cluster"
31+
)
32+
KUBECTL_CLUSTER_NAME = os.getenv("KUBECTL_CLUSTER_NAME", "providers-team-eks-namespace")
33+
VIRTUAL_CLUSTER_NAME = os.getenv("EMR_VIRTUAL_CLUSTER_NAME", "providers-team-virtual-eks-cluster")
34+
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "xxxxxxx")
35+
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "xxxxxxxx")
36+
AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-2")
37+
INSTANCE_TYPE = os.getenv("INSTANCE_TYPE", "m4.large")
38+
AIRFLOW_HOME = os.getenv("AIRFLOW_HOME", "/usr/local/airflow")
39+
40+
default_args = {
41+
"execution_timeout": timedelta(minutes=30),
42+
}
43+
44+
45+
def create_emr_virtual_cluster_func() -> None:
46+
"""Create EMR virtual cluster in container"""
47+
client = boto3.client("emr-containers")
48+
try:
49+
response = client.create_virtual_cluster(
50+
name=VIRTUAL_CLUSTER_NAME,
51+
containerProvider={
52+
"id": EKS_CONTAINER_PROVIDER_CLUSTER_NAME,
53+
"type": "EKS",
54+
"info": {"eksInfo": {"namespace": KUBECTL_CLUSTER_NAME}},
55+
},
56+
)
57+
os.environ["VIRTUAL_CLUSTER_ID"] = response["id"]
58+
except ClientError:
59+
logging.exception("Error while creating EMR virtual cluster")
60+
return None
61+
62+
63+
# [START howto_operator_emr_eks_config]
64+
JOB_DRIVER_ARG = {
65+
"sparkSubmitJobDriver": {
66+
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
67+
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1", # noqa: E501
68+
}
69+
}
70+
71+
CONFIGURATION_OVERRIDES_ARG = {
72+
"applicationConfiguration": [
73+
{
74+
"classification": "spark-defaults",
75+
"properties": {
76+
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # noqa: E501
77+
},
78+
}
79+
],
80+
"monitoringConfiguration": {
81+
"cloudWatchMonitoringConfiguration": {
82+
"logGroupName": "/aws/emr-eks-spark",
83+
"logStreamNamePrefix": "airflow",
84+
}
85+
},
86+
}
87+
# [END howto_operator_emr_eks_config]
88+
89+
with DAG(
90+
dag_id="emr_eks_pi_job",
91+
start_date=datetime(2022, 1, 1),
92+
schedule_interval=None,
93+
catchup=False,
94+
default_args=default_args,
95+
tags=["emr", "example"],
96+
) as dag:
97+
# Task steps for DAG to be self-sufficient
98+
setup_aws_config = BashOperator(
99+
task_id="setup_aws_config",
100+
bash_command=f"aws configure set aws_access_key_id {AWS_ACCESS_KEY_ID}; "
101+
f"aws configure set aws_secret_access_key {AWS_SECRET_ACCESS_KEY}; "
102+
f"aws configure set default.region {AWS_DEFAULT_REGION}; ",
103+
)
104+
105+
# Task to create EMR clusters on EKS
106+
create_EKS_cluster_kube_namespace_with_role = BashOperator(
107+
task_id="create_EKS_cluster_kube_namespace_with_role",
108+
bash_command="sh $AIRFLOW_HOME/dags/example_create_EKS_kube_namespace_with_role.sh ",
109+
)
110+
111+
# Task to create EMR virtual cluster
112+
create_EMR_virtual_cluster = PythonOperator(
113+
task_id="create_EMR_virtual_cluster",
114+
python_callable=create_emr_virtual_cluster_func,
115+
)
116+
117+
# [START howto_operator_run_emr_container_job]
118+
run_emr_container_job = EmrContainerOperator(
119+
task_id="run_emr_container_job",
120+
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
121+
execution_role_arn=JOB_ROLE_ARN,
122+
release_label="emr-6.2.0-latest",
123+
job_driver=JOB_DRIVER_ARG,
124+
configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
125+
name="pi.py",
126+
)
127+
# [END howto_operator_emr_eks_jobrun]
128+
129+
# [START howto_sensor_emr_job_container_sensor]
130+
emr_job_container_sensor = EmrContainerSensorAsync(
131+
task_id="emr_job_container_sensor",
132+
job_id=run_emr_container_job.output,
133+
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
134+
poll_interval=5,
135+
aws_conn_id=AWS_CONN_ID,
136+
)
137+
# [END howto_sensor_emr_job_container_sensor]
138+
139+
# Delete clusters, container providers, role, policy
140+
remove_clusters_container_role_policy = BashOperator(
141+
task_id="remove_clusters_container_role_policy",
142+
bash_command="sh $AIRFLOW_HOME/dags/example_delete_eks_cluster_and_role_policies.sh ",
143+
trigger_rule="all_done",
144+
)
145+
146+
(
147+
setup_aws_config
148+
>> create_EKS_cluster_kube_namespace_with_role
149+
>> create_EMR_virtual_cluster
150+
>> run_emr_container_job
151+
>> emr_job_container_sensor
152+
>> remove_clusters_container_role_policy
153+
)

astronomer/providers/amazon/aws/triggers/emr.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
197197
elif cluster_state in self.failed_states:
198198
final_message = "EMR job failed"
199199
failure_message = hook.failure_message_from_response(cluster_details)
200-
print("failure_message ", failure_message)
201200
if failure_message:
202201
final_message += " " + failure_message
203202
yield TriggerEvent({"status": "error", "message": final_message})

0 commit comments

Comments
 (0)