Skip to content

Implement GCSObjectsWithPrefixExistenceSensorAsync and GCSUploadSessionCompleteSensor #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 8, 2022

Conversation

rajaths010494
Copy link
Contributor

@rajaths010494 rajaths010494 commented Mar 3, 2022

Issue: #69

Implement the sensor GCSObjectsWithPrefixExistenceSensorAsync
Implement triggers for GCSObjectsWithPrefixExistenceSensorAsync
Add example DAG for GCSObjectsWithPrefixExistenceSensorAsync
Add test for GCSObjectsWithPrefixExistenceSensorAsync sensor, trigger and hooks

Implement the sensor GCSUploadSessionCompleteSensor
Implement triggers for GCSUploadSessionCompleteSensor
Add example DAG for GCSUploadSessionCompleteSensor
Add test for GCSUploadSessionCompleteSensor sensor, trigger and hooks

@codecov
Copy link

codecov bot commented Mar 3, 2022

Codecov Report

Merging #90 (8382289) into main (c033f6c) will decrease coverage by 0.16%.
The diff coverage is 99.10%.

❗ Current head 8382289 differs from pull request most recent head 932659f. Consider uploading reports for the commit 932659f to get more accurate results

Impacted file tree graph

@@            Coverage Diff             @@
##             main      #90      +/-   ##
==========================================
- Coverage   89.16%   88.99%   -0.17%     
==========================================
  Files          35       35              
  Lines        1689     1636      -53     
==========================================
- Hits         1506     1456      -50     
+ Misses        183      180       -3     
Impacted Files Coverage Δ
astronomer/providers/google/cloud/sensors/gcs.py 97.82% <96.42%> (-2.18%) ⬇️
astronomer/providers/google/cloud/triggers/gcs.py 100.00% <100.00%> (+7.69%) ⬆️
astronomer/providers/amazon/aws/triggers/s3.py 80.39% <0.00%> (-17.26%) ⬇️
...r/providers/amazon/aws/sensors/redshift_cluster.py 84.00% <0.00%> (-2.96%) ⬇️
...nomer/providers/amazon/aws/hooks/base_aws_async.py 87.09% <0.00%> (-0.41%) ⬇️
...omer/providers/amazon/aws/triggers/redshift_sql.py 100.00% <0.00%> (ø)
.../providers/amazon/aws/triggers/redshift_cluster.py 79.62% <0.00%> (+0.78%) ⬆️
astronomer/providers/amazon/aws/sensors/s3.py 100.00% <0.00%> (+1.56%) ⬆️
...onomer/providers/amazon/aws/hooks/redshift_data.py 98.11% <0.00%> (+3.37%) ⬆️
...mer/providers/amazon/aws/operators/redshift_sql.py 96.00% <0.00%> (+3.69%) ⬆️
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c033f6c...932659f. Read the comment docs.

Copy link
Collaborator

@sunank200 sunank200 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, implementation LGTM.

I had a question - Is there a specific reason to keep certain methods as private methods in triggers than to keep in hooks?

Also, there are few comments I have added.

@@ -55,7 +66,7 @@ def __init__(
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain

def execute(self, context):
def execute(self, context: "Context"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type is missing for execute().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already existing will be fixed as part of mypy-strict branch



class GCSObjectsWithPrefixExistenceSensorAsync(GCSObjectsWithPrefixExistenceSensor):
def __init__(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring and comment missing explaining the sensor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is inherited from sync version added doc string for only async inherited from base operator

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still needs to be added as we have one more kwarg: polling_interval that's not in the inherited operator. And this will be need for our Sphinx docs



class GCSUploadSessionCompleteSensorAsync(GCSUploadSessionCompleteSensor):
def __init__(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring and comment missing explaining the sensor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is inherited from sync version added doc string for only async inherited from base operator


def execute_complete(
self, context: "Context", event: Optional[Dict[Any, Any]] = None
) -> None: # pylint: disable=unused-argument
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type is Optional[str] as per the implementation rather than None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the return type


def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""
Serializes GCSBlobTrigger arguments and classpath.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be GCSPrefixBlobTrigger instead of GCSBlobTrigger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

:param prefix: The prefix of the blob_names to match in the Google cloud
storage bucket.
"""
async with Session() as s:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comment: s can be named better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed it

"""
return datetime.now()

def _is_bucket_updated(self, current_objects: Set[str]) -> dict[str, str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to keep it in triggers than in hooks?

Copy link
Contributor Author

@rajaths010494 rajaths010494 Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its used by GCSUploadSessionTrigger only if its was reused in other also would have moved to hooks

@rajaths010494 rajaths010494 changed the title Implement GCSObjectsWithPrefixExistenceSensorAsync Implement GCSObjectsWithPrefixExistenceSensorAsync and GCSUploadSessionCompleteSensor Mar 8, 2022

START_DATE = datetime(2022, 1, 1)
START_DATE = datetime(2021, 1, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since START_DATE is getting used once only we should directly use datetime(2021, 1, 1) in line 51

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it


PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "astronomer-airflow-providers")
BUCKET_1 = "test-gcs-example-bucket"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this also env variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added bucket in env

Comment on lines 1 to 17
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

This project is not maintained by ASF

@kaxil
Copy link
Collaborator

kaxil commented Mar 8, 2022

Need to link PR to issue -> https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword

You need to add the following in the PR description
closes #69

catchup=False,
schedule_interval="@once",
tags=["example"],
schedule_interval='@once',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change

@kaxil kaxil marked this pull request as ready for review March 8, 2022 21:29
@kaxil kaxil requested a review from sunank200 March 8, 2022 21:29
@kaxil kaxil dismissed sunank200’s stale review March 8, 2022 21:29

Addressed changes

@kaxil kaxil merged commit 6a1c9d6 into main Mar 8, 2022
@kaxil kaxil deleted the gcs_sensors branch March 8, 2022 22:06
@kaxil
Copy link
Collaborator

kaxil commented Mar 8, 2022

Well done, I made some changes @rajaths010494 , please check the last few commits and my comments -- do ask if you need any clarification

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants