-
Notifications
You must be signed in to change notification settings - Fork 27
EMR container self sufficient example DAG #242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
4204e99
Add step for the DAG to be self sufficient DAG
bharanidharan14 b51dd03
Merge branch 'main' into emr_container_example_dag
bharanidharan14 49dbc13
Add Docker file change to install kubectl
bharanidharan14 f16f3b7
Add clean up script for the EMR on EKS cluster
bharanidharan14 8c33be5
Merge branch 'main' into emr_container_example_dag
bharanidharan14 721721e
Remove unused code, Rename task name
bharanidharan14 e5643b9
Reduce the line and layer of Docker cmd
bharanidharan14 ee8fbda
Update Docker file
bharanidharan14 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
astronomer/providers/amazon/aws/example_dags/example_create_EKS_kube_namespace_with_role.sh
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
#!/bin/bash | ||
|
||
# create cluster | ||
eksctl create cluster \ | ||
--name $EKS_CONTAINER_PROVIDER_CLUSTER_NAME \ | ||
--region $AWS_DEFAULT_REGION \ | ||
--with-oidc \ | ||
--ssh-access \ | ||
--ssh-public-key providers_team_keypair \ | ||
--instance-types=$INSTANCE_TYPE \ | ||
--managed | ||
|
||
# create kubectl cluster namespace | ||
kubectl create namespace $KUBECTL_CLUSTER_NAME | ||
|
||
eksctl create iamidentitymapping \ | ||
--cluster $EKS_CONTAINER_PROVIDER_CLUSTER_NAME \ | ||
--namespace $KUBECTL_CLUSTER_NAME \ | ||
--service-name "emr-containers" | ||
|
||
aws eks describe-cluster --name $EKS_CONTAINER_PROVIDER_CLUSTER_NAME --query "cluster.identity.oidc.issuer" | ||
|
||
eksctl utils associate-iam-oidc-provider --cluster $EKS_CONTAINER_PROVIDER_CLUSTER_NAME --approve | ||
|
||
aws iam create-role --role-name $JOB_EXECUTION_ROLE --assume-role-policy-document '{"Version": "2012-10-17","Statement": | ||
[{"Effect": "Allow","Principal": {"AWS": "arn:aws:iam::'$AWS_ACCOUNT_ID':root"},"Action": | ||
"sts:AssumeRole","Condition": {}}]}' | ||
|
||
|
||
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$DEBUGGING_MONITORING_POLICY | ||
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$CONTAINER_SUBMIT_JOB_POLICY | ||
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$JOB_EXECUTION_POLICY | ||
aws iam attach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$MANAGE_VIRTUAL_CLUSTERS | ||
|
||
|
||
aws emr-containers update-role-trust-policy \ | ||
--cluster-name $EKS_CONTAINER_PROVIDER_CLUSTER_NAME \ | ||
--namespace $KUBECTL_CLUSTER_NAME \ | ||
--role-name $JOB_EXECUTION_ROLE | ||
|
||
export JOB_ROLE_ARN="arn:aws:iam::"$AWS_ACCOUNT_ID":role/"$JOB_EXECUTION_ROLE |
16 changes: 16 additions & 0 deletions
16
astronomer/providers/amazon/aws/example_dags/example_delete_eks_cluster_and_role_policies.sh
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
#!/bin/bash | ||
|
||
# cmd to delete the policy attached to the role | ||
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$DEBUGGING_MONITORING_POLICY | ||
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$CONTAINER_SUBMIT_JOB_POLICY | ||
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$JOB_EXECUTION_POLICY | ||
aws iam detach-role-policy --role-name $JOB_EXECUTION_ROLE --policy-arn arn:aws:iam::$AWS_ACCOUNT_ID:policy/$MANAGE_VIRTUAL_CLUSTERS | ||
|
||
# cmd to delete the role attached to the cluster | ||
aws iam delete-role --role-name $JOB_EXECUTION_ROLE | ||
|
||
# delete the virtual cluster | ||
aws emr-containers delete-virtual-cluster --id $VIRTUAL_CLUSTER_ID | ||
|
||
# cmd to delete the EKS cluster and node group attached to it | ||
eksctl delete cluster $EKS_CONTAINER_PROVIDER_CLUSTER_NAME |
82 changes: 0 additions & 82 deletions
82
astronomer/providers/amazon/aws/example_dags/example_emr.py
This file was deleted.
Oops, something went wrong.
153 changes: 153 additions & 0 deletions
153
astronomer/providers/amazon/aws/example_dags/example_emr_eks_containers_job.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
import logging | ||
import os | ||
from datetime import datetime, timedelta | ||
|
||
import boto3 | ||
from airflow import DAG | ||
from airflow.operators.bash import BashOperator | ||
from airflow.operators.python import PythonOperator | ||
from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator | ||
from botocore.exceptions import ClientError | ||
|
||
from astronomer.providers.amazon.aws.sensors.emr import EmrContainerSensorAsync | ||
|
||
# [START howto_operator_emr_eks_env_variables] | ||
VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "xxxxxxxx") | ||
AWS_CONN_ID = os.getenv("ASTRO_AWS_CONN_ID", "aws_default") | ||
JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::121212121212:role/test_iam_job_execution_role") | ||
# [END howto_operator_emr_eks_env_variables] | ||
|
||
# Job role name and policy name attached to the role | ||
JOB_EXECUTION_ROLE = os.getenv("JOB_EXECUTION_ROLE", "test_iam_job_execution_role") | ||
DEBUGGING_MONITORING_POLICY = os.getenv("DEBUGGING_MONITORING_POLICY", "test_debugging_monitoring_policy") | ||
CONTAINER_SUBMIT_JOB_POLICY = os.getenv( | ||
"CONTAINER_SUBMIT_JOB_POLICY", "test_emr_container_submit_jobs_policy" | ||
) | ||
JOB_EXECUTION_POLICY = os.getenv("JOB_EXECUTION_POLICY", "test_job_execution_policy") | ||
MANAGE_VIRTUAL_CLUSTERS = os.getenv("MANAGE_VIRTUAL_CLUSTERS", "test_manage_virtual_clusters") | ||
|
||
EKS_CONTAINER_PROVIDER_CLUSTER_NAME = os.getenv( | ||
"EKS_CONTAINER_PROVIDER_CLUSTER_NAME", "providers-team-eks-cluster" | ||
) | ||
KUBECTL_CLUSTER_NAME = os.getenv("KUBECTL_CLUSTER_NAME", "providers-team-eks-namespace") | ||
VIRTUAL_CLUSTER_NAME = os.getenv("EMR_VIRTUAL_CLUSTER_NAME", "providers-team-virtual-eks-cluster") | ||
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "xxxxxxx") | ||
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "xxxxxxxx") | ||
AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-2") | ||
INSTANCE_TYPE = os.getenv("INSTANCE_TYPE", "m4.large") | ||
AIRFLOW_HOME = os.getenv("AIRFLOW_HOME", "/usr/local/airflow") | ||
|
||
default_args = { | ||
"execution_timeout": timedelta(minutes=30), | ||
} | ||
|
||
|
||
def create_emr_virtual_cluster_func() -> None: | ||
"""Create EMR virtual cluster in container""" | ||
client = boto3.client("emr-containers") | ||
try: | ||
response = client.create_virtual_cluster( | ||
name=VIRTUAL_CLUSTER_NAME, | ||
containerProvider={ | ||
"id": EKS_CONTAINER_PROVIDER_CLUSTER_NAME, | ||
"type": "EKS", | ||
"info": {"eksInfo": {"namespace": KUBECTL_CLUSTER_NAME}}, | ||
}, | ||
) | ||
os.environ["VIRTUAL_CLUSTER_ID"] = response["id"] | ||
bharanidharan14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except ClientError: | ||
logging.exception("Error while creating EMR virtual cluster") | ||
return None | ||
bharanidharan14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
# [START howto_operator_emr_eks_config] | ||
JOB_DRIVER_ARG = { | ||
"sparkSubmitJobDriver": { | ||
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", | ||
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1", # noqa: E501 | ||
} | ||
} | ||
|
||
CONFIGURATION_OVERRIDES_ARG = { | ||
"applicationConfiguration": [ | ||
{ | ||
"classification": "spark-defaults", | ||
"properties": { | ||
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # noqa: E501 | ||
}, | ||
} | ||
], | ||
"monitoringConfiguration": { | ||
"cloudWatchMonitoringConfiguration": { | ||
"logGroupName": "/aws/emr-eks-spark", | ||
"logStreamNamePrefix": "airflow", | ||
} | ||
}, | ||
} | ||
# [END howto_operator_emr_eks_config] | ||
|
||
with DAG( | ||
dag_id="emr_eks_pi_job", | ||
start_date=datetime(2022, 1, 1), | ||
schedule_interval=None, | ||
catchup=False, | ||
default_args=default_args, | ||
tags=["emr", "example"], | ||
) as dag: | ||
# Task steps for DAG to be self-sufficient | ||
setup_aws_config = BashOperator( | ||
task_id="setup_aws_config", | ||
bash_command=f"aws configure set aws_access_key_id {AWS_ACCESS_KEY_ID}; " | ||
f"aws configure set aws_secret_access_key {AWS_SECRET_ACCESS_KEY}; " | ||
f"aws configure set default.region {AWS_DEFAULT_REGION}; ", | ||
) | ||
|
||
# Task to create EMR clusters on EKS | ||
create_EKS_cluster_kube_namespace_with_role = BashOperator( | ||
task_id="create_EKS_cluster_kube_namespace_with_role", | ||
bash_command="sh $AIRFLOW_HOME/dags/example_create_EKS_kube_namespace_with_role.sh ", | ||
) | ||
|
||
# Task to create EMR virtual cluster | ||
create_EMR_virtual_cluster = PythonOperator( | ||
task_id="create_EMR_virtual_cluster", | ||
python_callable=create_emr_virtual_cluster_func, | ||
) | ||
|
||
# [START howto_operator_run_emr_container_job] | ||
run_emr_container_job = EmrContainerOperator( | ||
task_id="run_emr_container_job", | ||
virtual_cluster_id=VIRTUAL_CLUSTER_ID, | ||
execution_role_arn=JOB_ROLE_ARN, | ||
release_label="emr-6.2.0-latest", | ||
job_driver=JOB_DRIVER_ARG, | ||
configuration_overrides=CONFIGURATION_OVERRIDES_ARG, | ||
name="pi.py", | ||
) | ||
# [END howto_operator_emr_eks_jobrun] | ||
|
||
# [START howto_sensor_emr_job_container_sensor] | ||
emr_job_container_sensor = EmrContainerSensorAsync( | ||
task_id="emr_job_container_sensor", | ||
job_id=run_emr_container_job.output, | ||
virtual_cluster_id=VIRTUAL_CLUSTER_ID, | ||
poll_interval=5, | ||
aws_conn_id=AWS_CONN_ID, | ||
) | ||
# [END howto_sensor_emr_job_container_sensor] | ||
|
||
# Delete clusters, container providers, role, policy | ||
remove_clusters_container_role_policy = BashOperator( | ||
task_id="remove_clusters_container_role_policy", | ||
bash_command="sh $AIRFLOW_HOME/dags/example_delete_eks_cluster_and_role_policies.sh ", | ||
trigger_rule="all_done", | ||
) | ||
|
||
( | ||
setup_aws_config | ||
>> create_EKS_cluster_kube_namespace_with_role | ||
>> create_EMR_virtual_cluster | ||
>> run_emr_container_job | ||
>> emr_job_container_sensor | ||
>> remove_clusters_container_role_policy | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.