Skip to content

Commit 8382289

Browse files
rajaths010494kaxil
authored andcommitted
Fix mypy changes and add env in example dag
1 parent cd4f34f commit 8382289

File tree

3 files changed

+17
-12
lines changed

3 files changed

+17
-12
lines changed

astronomer/providers/google/cloud/example_dags/example_gcs.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,31 +37,34 @@
3737
GCSUploadSessionCompleteSensorAsync,
3838
)
3939

40-
START_DATE = datetime(2021, 1, 1)
41-
4240
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "astronomer-airflow-providers")
43-
BUCKET_1 = "test-gcs-example-bucket"
41+
BUCKET_1 = os.environ.get("GCP_TEST_BUCKET", "test-gcs-example-bucket")
4442
PATH_TO_UPLOAD_FILE = "dags/example_gcs.py"
4543
PATH_TO_UPLOAD_FILE_PREFIX = "example_"
4644

4745
BUCKET_FILE_LOCATION = "example_gcs.py"
4846

4947
with models.DAG(
5048
"example_async_gcs_sensors",
51-
start_date=START_DATE,
49+
start_date=datetime(2021, 1, 1),
5250
catchup=False,
5351
schedule_interval='@once',
5452
tags=['example'],
5553
) as dag:
54+
55+
# [START howto_create_bucket_task]
5656
create_bucket = GCSCreateBucketOperator(
5757
task_id="create_bucket", bucket_name=BUCKET_1, project_id=PROJECT_ID
5858
)
59+
# [END howto_create_bucket_task]
60+
# [START howto_upload_file_task]
5961
upload_file = LocalFilesystemToGCSOperator(
6062
task_id="upload_file",
6163
src=PATH_TO_UPLOAD_FILE,
6264
dst=BUCKET_FILE_LOCATION,
6365
bucket=BUCKET_1,
6466
)
67+
# [END howto_upload_file_task]
6568
# [START howto_sensor_object_exists_task]
6669
gcs_object_exists = GCSObjectExistenceSensorAsync(
6770
bucket=BUCKET_1,
@@ -76,7 +79,7 @@
7679
task_id="gcs_object_with_prefix_exists_task",
7780
)
7881
# [END howto_sensor_object_with_prefix_exists_task]
79-
82+
# [START howto_sensor_gcs_upload_session_complete_task]
8083
gcs_upload_session_complete = GCSUploadSessionCompleteSensorAsync(
8184
bucket=BUCKET_1,
8285
prefix=PATH_TO_UPLOAD_FILE_PREFIX,
@@ -86,8 +89,10 @@
8689
previous_objects=set(),
8790
task_id="gcs_upload_session_complete_task",
8891
)
89-
92+
# [END howto_sensor_gcs_upload_session_complete_task]
93+
# [START howto_delete_buckettask]
9094
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)
95+
# [END howto_delete_buckettask]
9196

9297
(
9398
create_bucket

astronomer/providers/google/cloud/sensors/gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""This module contains Google Cloud Storage sensors."""
22

3-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
3+
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
44

55
from airflow.exceptions import AirflowException
66
from airflow.models.baseoperator import BaseOperator
@@ -115,7 +115,7 @@ def execute(self, context: "Context") -> None:
115115

116116
def execute_complete(
117117
self, context: "Context", event: Optional[Dict[Any, Any]] = None
118-
) -> List[str]: # pylint: disable=unused-argument
118+
) -> Any: # pylint: disable=unused-argument
119119
"""
120120
Callback for when the trigger fires - returns immediately.
121121
Relies on trigger to throw an exception, otherwise it assumes execution was
@@ -155,7 +155,7 @@ def execute(self, context: "Context") -> None:
155155

156156
def execute_complete(
157157
self, context: "Context", event: Optional[Dict[Any, Any]] = None
158-
) -> None: # pylint: disable=unused-argument
158+
) -> Optional[str]: # pylint: disable=unused-argument
159159
"""
160160
Callback for when the trigger fires - returns immediately.
161161
Relies on trigger to throw an exception, otherwise it assumes execution was

astronomer/providers/google/cloud/triggers/gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def __init__(
121121

122122
def serialize(self) -> Tuple[str, Dict[str, Any]]:
123123
"""
124-
Serializes GCSBlobTrigger arguments and classpath.
124+
Serializes GCSPrefixBlobTrigger arguments and classpath.
125125
"""
126126
return (
127127
"astronomer.providers.google.cloud.triggers.gcs.GCSPrefixBlobTrigger",
@@ -162,8 +162,8 @@ async def _list_blobs_with_prefix(self, hook: GCSHookAsync, bucket_name: str, pr
162162
:param prefix: The prefix of the blob_names to match in the Google cloud
163163
storage bucket.
164164
"""
165-
async with Session() as s:
166-
client = await hook.get_storage_client(s)
165+
async with Session() as session:
166+
client = await hook.get_storage_client(session)
167167
bucket = client.get_bucket(bucket_name)
168168
object_response = await bucket.list_blobs(prefix=prefix)
169169
return object_response

0 commit comments

Comments
 (0)