Skip to content

Commit 6e553a2

Browse files
authored
[Dataproc] Optimize Example DAG And Push Job ID in XCOM (#399)
1 parent 338afe1 commit 6e553a2

File tree

5 files changed

+42
-101
lines changed

5 files changed

+42
-101
lines changed

astronomer/providers/google/cloud/example_dags/example_dataproc.py

Lines changed: 9 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,8 @@
55

66
from airflow import models
77
from airflow.providers.google.cloud.operators.dataproc import (
8-
ClusterGenerator,
98
DataprocCreateClusterOperator,
10-
DataprocCreateWorkflowTemplateOperator,
119
DataprocDeleteClusterOperator,
12-
DataprocInstantiateInlineWorkflowTemplateOperator,
13-
DataprocInstantiateWorkflowTemplateOperator,
14-
DataprocUpdateClusterOperator,
1510
)
1611
from airflow.providers.google.cloud.operators.gcs import (
1712
GCSCreateBucketOperator,
@@ -40,49 +35,10 @@
4035
"machine_type_uri": "n1-standard-4",
4136
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
4237
},
43-
"worker_config": {
44-
"num_instances": 2,
45-
"machine_type_uri": "n1-standard-4",
46-
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
47-
},
4838
}
4939

5040
# [END how_to_cloud_dataproc_create_cluster]
5141

52-
# Cluster definition: Generating Cluster Config for DataprocCreateClusterOperator
53-
# [START how_to_cloud_dataproc_create_cluster_generate_cluster_config]
54-
path = "gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh"
55-
56-
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
57-
project_id="test",
58-
zone="us-central1-a",
59-
master_machine_type="n1-standard-4",
60-
worker_machine_type="n1-standard-4",
61-
num_workers=2,
62-
storage_bucket="test",
63-
init_actions_uris=[path],
64-
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
65-
).make()
66-
67-
create_cluster_operator = DataprocCreateClusterOperator(
68-
task_id="create_dataproc_cluster",
69-
cluster_name="test",
70-
project_id="test",
71-
region="us-central1",
72-
cluster_config=CLUSTER_GENERATOR_CONFIG,
73-
)
74-
# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]
75-
76-
# Update options
77-
# [START how_to_cloud_dataproc_updatemask_cluster_operator]
78-
CLUSTER_UPDATE = {
79-
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
80-
}
81-
UPDATE_MASK = {
82-
"paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
83-
}
84-
# [END how_to_cloud_dataproc_updatemask_cluster_operator]
85-
8642
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
8743

8844
# Jobs definitions
@@ -183,39 +139,6 @@
183139
)
184140
# [END howto_create_bucket_task]
185141

186-
# [START how_to_cloud_dataproc_update_cluster_operator]
187-
scale_cluster = DataprocUpdateClusterOperator(
188-
task_id="scale_cluster",
189-
cluster_name=CLUSTER_NAME,
190-
cluster=CLUSTER_UPDATE,
191-
update_mask=UPDATE_MASK,
192-
graceful_decommission_timeout=TIMEOUT,
193-
project_id=PROJECT_ID,
194-
region=REGION,
195-
)
196-
# [END how_to_cloud_dataproc_update_cluster_operator]
197-
198-
# [START how_to_cloud_dataproc_create_workflow_template]
199-
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
200-
task_id="create_workflow_template",
201-
template=WORKFLOW_TEMPLATE,
202-
project_id=PROJECT_ID,
203-
region=REGION,
204-
)
205-
# [END how_to_cloud_dataproc_create_workflow_template]
206-
207-
# [START how_to_cloud_dataproc_trigger_workflow_template]
208-
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
209-
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
210-
)
211-
# [END how_to_cloud_dataproc_trigger_workflow_template]
212-
213-
# [START how_to_cloud_dataproc_instantiate_inline_workflow_template]
214-
instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
215-
task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
216-
)
217-
# [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
218-
219142
# [START howto_DataprocSubmitJobOperatorAsync]
220143
pig_task = DataprocSubmitJobOperatorAsync(
221144
task_id="pig_task", job=PIG_JOB, region=REGION, project_id=PROJECT_ID
@@ -243,20 +166,21 @@
243166
# [END howto_DataprocSubmitJobOperatorAsync]
244167
# [START how_to_cloud_dataproc_delete_cluster_operator]
245168
delete_cluster = DataprocDeleteClusterOperator(
246-
task_id="delete_cluster", project_id=PROJECT_ID, cluster_name=CLUSTER_NAME, region=REGION
169+
task_id="delete_cluster",
170+
project_id=PROJECT_ID,
171+
cluster_name=CLUSTER_NAME,
172+
region=REGION,
173+
trigger_rule="all_done",
247174
)
248175
# [END how_to_cloud_dataproc_delete_cluster_operator]
249176
# [START howto_delete_buckettask]
250177
delete_bucket = GCSDeleteBucketOperator(
251178
task_id="delete_bucket",
252179
bucket_name=BUCKET,
180+
trigger_rule="all_done",
253181
)
254182
# [END howto_delete_buckettask]
255183

256-
create_cluster >> scale_cluster >> create_bucket
257-
scale_cluster >> create_workflow_template >> trigger_workflow >> delete_cluster
258-
scale_cluster >> hive_task >> delete_cluster >> delete_bucket
259-
scale_cluster >> pig_task >> delete_cluster >> delete_bucket
260-
scale_cluster >> spark_sql_task >> delete_cluster >> delete_bucket
261-
scale_cluster >> spark_task >> delete_cluster >> delete_bucket
262-
scale_cluster >> hadoop_task >> delete_cluster >> delete_bucket
184+
create_cluster >> create_bucket
185+
create_cluster >> pig_task >> hive_task >> delete_cluster >> delete_bucket
186+
create_cluster >> spark_task >> spark_sql_task >> hadoop_task >> delete_cluster >> delete_bucket

astronomer/providers/google/cloud/operators/dataproc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[str, st
8181
"""
8282
if event:
8383
if event["status"] == "success":
84-
self.log.debug("Job %s completed successfully.", self.job_id)
85-
return event["message"]
84+
self.log.info("Job %s completed successfully.", event["job_id"])
85+
return event["job_id"]
8686
raise AirflowException(event["message"])
8787
raise AirflowException("No event received in trigger callback")

astronomer/providers/google/cloud/triggers/dataproc.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ def __init__(
2828
project_id: Optional[str] = None,
2929
gcp_conn_id: str = "google_cloud_default",
3030
polling_interval: float = 5.0,
31-
**kwargs: Any,
3231
) -> None:
33-
super().__init__(**kwargs)
32+
super().__init__()
3433
self.project_id = project_id
3534
self.gcp_conn_id = gcp_conn_id
3635
self.dataproc_job_id = dataproc_job_id
@@ -70,15 +69,19 @@ async def _get_job_status(self, hook: DataprocHookAsync) -> Dict[str, str]:
7069
job = await hook.get_job(job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id)
7170
state = job.status.state
7271
if state == JobStatus.State.ERROR:
73-
return {"status": "error", "message": "Job Failed"}
72+
return {"status": "error", "message": "Job Failed", "job_id": self.dataproc_job_id}
7473
elif state in {
7574
JobStatus.State.CANCELLED,
7675
JobStatus.State.CANCEL_PENDING,
7776
JobStatus.State.CANCEL_STARTED,
7877
}:
79-
return {"status": "error", "message": "Job got cancelled"}
78+
return {"status": "error", "message": "Job got cancelled", "job_id": self.dataproc_job_id}
8079
elif JobStatus.State.DONE == state:
81-
return {"status": "success", "message": "Job completed successfully"}
80+
return {
81+
"status": "success",
82+
"message": "Job completed successfully",
83+
"job_id": self.dataproc_job_id,
84+
}
8285
elif JobStatus.State.ATTEMPT_FAILURE == state:
83-
return {"status": "pending", "message": "Job is in pending state"}
84-
return {"status": "pending", "message": "Job is in pending state"}
86+
return {"status": "pending", "message": "Job is in pending state", "job_id": self.dataproc_job_id}
87+
return {"status": "pending", "message": "Job is in pending state", "job_id": self.dataproc_job_id}

tests/google/cloud/operators/test_dataproc.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def test_dataproc_operator_execute_async(mock_submit_job):
5252
"event",
5353
[
5454
({"status": "error", "message": "test failure message"}),
55-
(None),
55+
None,
5656
],
5757
)
5858
@mock.patch("airflow.providers.google.cloud.operators.dataproc.DataprocHook.submit_job")
@@ -73,4 +73,6 @@ def test_dataproc_operator_execute_success_async(mock_submit_job):
7373
task = DataprocSubmitJobOperatorAsync(
7474
task_id="task-id", job=SPARK_JOB, region=TEST_REGION, project_id=TEST_PROJECT_ID
7575
)
76-
assert task.execute_complete(context=None, event={"status": "success", "message": "success"})
76+
assert task.execute_complete(
77+
context=None, event={"status": "success", "message": "success", "job_id": TEST_JOB_ID}
78+
)

tests/google/cloud/triggers/test_dataproc.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,23 @@ async def test_dataproc_submit_return_exception(mock_get_job_status):
106106
@pytest.mark.parametrize(
107107
"state, response",
108108
[
109-
(JobStatus.State.DONE, {"status": "success", "message": "Job completed successfully"}),
110-
(JobStatus.State.ERROR, {"status": "error", "message": "Job Failed"}),
111-
(JobStatus.State.CANCELLED, {"status": "error", "message": "Job got cancelled"}),
112-
(JobStatus.State.ATTEMPT_FAILURE, {"status": "pending", "message": "Job is in pending state"}),
113-
(JobStatus.State.SETUP_DONE, {"status": "pending", "message": "Job is in pending state"}),
109+
(
110+
JobStatus.State.DONE,
111+
{"status": "success", "message": "Job completed successfully", "job_id": TEST_JOB_ID},
112+
),
113+
(JobStatus.State.ERROR, {"status": "error", "message": "Job Failed", "job_id": TEST_JOB_ID}),
114+
(
115+
JobStatus.State.CANCELLED,
116+
{"status": "error", "message": "Job got cancelled", "job_id": TEST_JOB_ID},
117+
),
118+
(
119+
JobStatus.State.ATTEMPT_FAILURE,
120+
{"status": "pending", "message": "Job is in pending state", "job_id": TEST_JOB_ID},
121+
),
122+
(
123+
JobStatus.State.SETUP_DONE,
124+
{"status": "pending", "message": "Job is in pending state", "job_id": TEST_JOB_ID},
125+
),
114126
],
115127
)
116128
async def test_dataproc_get_job_status(state, response):

0 commit comments

Comments
 (0)