Skip to content

Implement Async GCS object update sensor #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0c6f7e4
Implement GCSObjectWithPrefixExistenceSensor
rajaths010494 Mar 2, 2022
e4f5814
Fix static check in triggers
rajaths010494 Mar 2, 2022
22e22da
add example dag for GCSObjectsWithPrefixExistenceSensorAsync
rajaths010494 Mar 3, 2022
a5aa480
Add tests for GCSObjectsWithPrefixExistenceSensorAsync
rajaths010494 Mar 3, 2022
6207214
Fix mypy issues in gcs sensor
rajaths010494 Mar 3, 2022
027dec9
Implement Async GCSObjectUpdateSensorAsync
bharanidharan14 Mar 3, 2022
4307ebb
Fix Test Coverage Snowflake and Redshift Cluster
bharanidharan14 Mar 4, 2022
2cc560a
Merge branch 'main' into Fix_code_coverage
bharanidharan14 Mar 4, 2022
88a9424
Merge branch 'main' into GCS_ObjectUpdateSensor
bharanidharan14 Mar 4, 2022
6f8f1e8
Add Test case and Add GCSObjectUpdateSensorAsync example operator
bharanidharan14 Mar 4, 2022
60fb334
Add Test case, Example DAG
bharanidharan14 Mar 7, 2022
60c61fe
Removed print, added comments and doc string
bharanidharan14 Mar 7, 2022
b65591f
Fix mypy return type
bharanidharan14 Mar 7, 2022
d50b4d7
Redshift cluster test coverage fix
bharanidharan14 Mar 7, 2022
d9bda2a
Redshift cluster hook test coverage
bharanidharan14 Mar 7, 2022
c99b817
Fix test cases name, Remove unused code, Add docstring
bharanidharan14 Mar 7, 2022
d3c13d5
Update redshift pause and resume cluster succes test case
bharanidharan14 Mar 8, 2022
353dd66
Add docu string and removed unused code
bharanidharan14 Mar 8, 2022
2b38c6e
Update example_redshift_sql.py
bharanidharan14 Mar 8, 2022
a921d14
Fix Mypy issue in Snowflake & core
bharanidharan14 Mar 9, 2022
1292a3d
Resolve conflicts
bharanidharan14 Mar 9, 2022
afdd274
Resolve conflicts
bharanidharan14 Mar 9, 2022
6dc7635
Fix Test case, mypy issue
bharanidharan14 Mar 9, 2022
7215676
Fixed mypy issue
bharanidharan14 Mar 9, 2022
a15f501
Fix in Snowflake hook, GCS
bharanidharan14 Mar 9, 2022
477f6de
Merge branch 'main' into GCS_ObjectUpdateSensor
bharanidharan14 Mar 10, 2022
7c742cb
Fix Doc string, Query
bharanidharan14 Mar 10, 2022
59de310
Merge branch 'GCS_ObjectUpdateSensor' of https://github.com/astronome…
bharanidharan14 Mar 10, 2022
eb31453
Remove Test coverage codes from GCS_objectUpdateSensor branch
bharanidharan14 Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion astronomer/providers/google/cloud/example_dags/example_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from astronomer.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensorAsync,
GCSObjectsWithPrefixExistenceSensorAsync,
GCSObjectUpdateSensorAsync,
GCSUploadSessionCompleteSensorAsync,
)

Expand Down Expand Up @@ -71,14 +72,26 @@
task_id="gcs_upload_session_complete_task",
)
# [END howto_sensor_gcs_upload_session_complete_task]
# [START howto_sensor_object_update_exists_task]
gcs_update_object_exists = GCSObjectUpdateSensorAsync(
bucket=BUCKET_1,
object=BUCKET_FILE_LOCATION,
task_id="gcs_object_update_sensor_task_async",
)
# [END howto_sensor_object_update_exists_task]
# [START howto_delete_buckettask]
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)
# [END howto_delete_buckettask]

(
create_bucket
>> upload_file
>> [gcs_object_exists, gcs_object_with_prefix_exists, gcs_upload_session_complete]
>> [
gcs_object_exists,
gcs_object_with_prefix_exists,
gcs_upload_session_complete,
gcs_update_object_exists,
]
>> delete_bucket
)

Expand Down
70 changes: 69 additions & 1 deletion astronomer/providers/google/cloud/sensors/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from airflow.models.baseoperator import BaseOperator
from airflow.providers.google.cloud.sensors.gcs import (
GCSObjectsWithPrefixExistenceSensor,
GCSObjectUpdateSensor,
GCSUploadSessionCompleteSensor,
)

from astronomer.providers.google.cloud.triggers.gcs import (
GCSBlobTrigger,
GCSCheckBlobUpdateTimeTrigger,
GCSPrefixBlobTrigger,
GCSUploadSessionTrigger,
)
Expand Down Expand Up @@ -58,7 +60,6 @@ def __init__(
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs: Any,
) -> None:

super().__init__(**kwargs)
self.bucket = bucket
self.object = object
Expand Down Expand Up @@ -236,3 +237,70 @@ def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[str, st
return event["message"]
raise AirflowException(event["message"])
raise AirflowException("No event received in trigger callback")


class GCSObjectUpdateSensorAsync(GCSObjectUpdateSensor):
"""
Async version to check if an object is updated in Google Cloud Storage

:param bucket: The Google Cloud Storage bucket where the object is.
:param object: The name of the object to download in the Google cloud
storage bucket.
:param ts_func: Callback for defining the update condition. The default callback
returns execution_date + schedule_interval. The callback takes the context
as parameter.
:param google_cloud_conn_id: The connection ID to use when
connecting to Google Cloud Storage.
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

def __init__(
self,
polling_interval: float = 5,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.polling_interval = polling_interval

def execute(self, context: Dict[str, Any]) -> None:
"""Airflow runs this method on the worker and defers using the trigger."""
self.defer(
timeout=self.execution_timeout,
trigger=GCSCheckBlobUpdateTimeTrigger(
bucket=self.bucket,
object_name=self.object,
ts=self.ts_func(context),
polling_period_seconds=self.polling_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
"delegate_to": self.delegate_to,
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
)

def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[str, str]] = None) -> str:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if event["status"] == "success":
self.log.info(
"Sensor checks update time for object %s in bucket : %s", self.object, self.bucket
)
return event["message"]
raise AirflowException(event["message"])
raise AirflowException("No event received in trigger callback")
101 changes: 101 additions & 0 deletions astronomer/providers/google/cloud/triggers/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from aiohttp import ClientSession
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone

from astronomer.providers.google.cloud.hooks.gcs import GCSHookAsync

Expand Down Expand Up @@ -320,3 +321,103 @@ def _is_bucket_updated(self, current_objects: Set[str]) -> Dict[str, str]:
self.log.error(error_message, path)
return {"status": "error", "message": error_message % path}
return {"status": "pending"}


class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
"""
A trigger that makes an async call to GCS to check whether the object is updated in a bucket.

:param bucket: google cloud storage bucket name cloud storage where the objects are residing.
:param object_name: the file or folder present in the bucket
:param ts: datetime object
:param polling_period_seconds: polling period in seconds to check for file/folder
:param google_cloud_conn_id: reference to the Google Connection
:param hook_params: DIct object has delegate_to and impersonation_chain
"""

def __init__(
self,
bucket: str,
object_name: str,
ts: datetime,
polling_period_seconds: float,
google_cloud_conn_id: str,
hook_params: Dict[str, Any],
):
super().__init__()
self.bucket = bucket
self.object_name = object_name
self.ts = ts
self.polling_period_seconds = polling_period_seconds
self.google_cloud_conn_id: str = google_cloud_conn_id
self.hook_params = hook_params

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes GCSCheckBlobUpdateTimeTrigger arguments and classpath."""
return (
"astronomer.providers.google.cloud.triggers.gcs.GCSCheckBlobUpdateTimeTrigger",
{
"bucket": self.bucket,
"object_name": self.object_name,
"ts": self.ts,
"polling_period_seconds": self.polling_period_seconds,
"google_cloud_conn_id": self.google_cloud_conn_id,
"hook_params": self.hook_params,
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
"""Simple loop until the object updated time is greater than ts datetime in bucket."""
try:
hook = self._get_async_hook()
while True:
status, res = await self._is_blob_updated_after(
hook=hook, bucket_name=self.bucket, object_name=self.object_name, ts=self.ts
)
if status:
yield TriggerEvent(res)
return
await asyncio.sleep(self.polling_period_seconds)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
return

def _get_async_hook(self) -> GCSHookAsync:
return GCSHookAsync(gcp_conn_id=self.google_cloud_conn_id, **self.hook_params)

async def _is_blob_updated_after(
self, hook: GCSHookAsync, bucket_name: str, object_name: str, ts: datetime
) -> Tuple[bool, Dict[str, Any]]:
"""
Checks if the object in the bucket is updated.

:param hook: GCSHookAsync Hook class
:param bucket_name: The Google Cloud Storage bucket where the object is.
:param object_name: The name of the blob_name to check in the Google cloud
storage bucket.
:param ts: context datetime to compare with blob object updated time
"""
async with ClientSession() as session:
client = await hook.get_storage_client(session)
bucket = client.get_bucket(bucket_name)
blob = await bucket.get_blob(blob_name=object_name)
if blob is None:
res = {
"message": f"Object ({object_name}) not found in Bucket ({bucket_name})",
"status": "error",
}
return True, res

blob_updated_date = blob.updated # type: ignore[attr-defined]
blob_updated_time = datetime.strptime(blob_updated_date, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=timezone.utc
) # Blob updated time is in string format so converting the string format
# to datetime object to compare the last updated time

if blob_updated_time is not None:
if not ts.tzinfo:
ts = ts.replace(tzinfo=timezone.utc)
self.log.info("Verify object date: %s > %s", blob_updated_time, ts)
if blob_updated_time > ts:
return True, {"status": "success", "message": "success"}
return False, {"status": "pending", "message": "pending"}
53 changes: 51 additions & 2 deletions tests/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from unittest import mock

import pytest
Expand All @@ -24,10 +25,12 @@
from astronomer.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensorAsync,
GCSObjectsWithPrefixExistenceSensorAsync,
GCSObjectUpdateSensorAsync,
GCSUploadSessionCompleteSensorAsync,
)
from astronomer.providers.google.cloud.triggers.gcs import (
GCSBlobTrigger,
GCSCheckBlobUpdateTimeTrigger,
GCSPrefixBlobTrigger,
GCSUploadSessionTrigger,
)
Expand All @@ -40,12 +43,13 @@
TEST_MIN_OBJECTS = 1


@pytest.fixture(scope="function")
@pytest.fixture()
def context():
"""
Creates an empty context.
"""
yield
context = {"data_interval_end": datetime.utcnow()}
yield context


def test_gcs_object_existence_sensor_async():
Expand Down Expand Up @@ -175,3 +179,48 @@ def test_gcs_upload_session_complete_sensor_async_execute_complete():
min_objects=TEST_MIN_OBJECTS,
)
assert task.execute_complete(context=None, event={"status": "success", "message": "success"})


def test_gcs_object_update_sensor_async(context):
"""
Asserts that a task is deferred and a GCSBlobTrigger will be fired
when the GCSObjectUpdateSensorAsync is executed.
"""
task = GCSObjectUpdateSensorAsync(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(
exc.value.trigger, GCSCheckBlobUpdateTimeTrigger
), "Trigger is not a GCSCheckBlobUpdateTimeTrigger"


def test_gcs_object_update_sensor_async_execute_failure(context):
"""Tests that an AirflowException is raised in case of error event"""
task = GCSObjectUpdateSensorAsync(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
with pytest.raises(AirflowException):
task.execute_complete(context=None, event={"status": "error", "message": "test failure message"})


def test_gcs_object_update_sensor_async_execute_complete():
"""Asserts that logging occurs as expected"""
task = GCSObjectUpdateSensorAsync(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
with mock.patch.object(task.log, "info") as mock_log_info:
task.execute_complete(context=None, event={"status": "success", "message": "Job completed"})
mock_log_info.assert_called_with(
"Sensor checks update time for object %s in bucket : %s", TEST_OBJECT, TEST_BUCKET
)
Loading