-
Notifications
You must be signed in to change notification settings - Fork 26
Implement GCSObjectsWithPrefixExistenceSensorAsync and GCSUploadSessionCompleteSensor #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## main #90 +/- ##
==========================================
- Coverage 89.16% 88.99% -0.17%
==========================================
Files 35 35
Lines 1689 1636 -53
==========================================
- Hits 1506 1456 -50
+ Misses 183 180 -3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, implementation LGTM.
I had a question - Is there a specific reason to keep certain methods as private methods in triggers than to keep in hooks?
Also, there are few comments I have added.
@@ -55,7 +66,7 @@ def __init__( | |||
self.delegate_to = delegate_to | |||
self.impersonation_chain = impersonation_chain | |||
|
|||
def execute(self, context): | |||
def execute(self, context: "Context"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type is missing for execute()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already existing will be fixed as part of mypy-strict branch
|
||
|
||
class GCSObjectsWithPrefixExistenceSensorAsync(GCSObjectsWithPrefixExistenceSensor): | ||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring and comment missing explaining the sensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is inherited from sync version added doc string for only async inherited from base operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still needs to be added as we have one more kwarg: polling_interval
that's not in the inherited operator. And this will be need for our Sphinx docs
|
||
|
||
class GCSUploadSessionCompleteSensorAsync(GCSUploadSessionCompleteSensor): | ||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring and comment missing explaining the sensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is inherited from sync version added doc string for only async inherited from base operator
|
||
def execute_complete( | ||
self, context: "Context", event: Optional[Dict[Any, Any]] = None | ||
) -> None: # pylint: disable=unused-argument |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return type is Optional[str]
as per the implementation rather than None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the return type
|
||
def serialize(self) -> Tuple[str, Dict[str, Any]]: | ||
""" | ||
Serializes GCSBlobTrigger arguments and classpath. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be GCSPrefixBlobTrigger
instead of GCSBlobTrigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
:param prefix: The prefix of the blob_names to match in the Google cloud | ||
storage bucket. | ||
""" | ||
async with Session() as s: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comment: s
can be named better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed it
""" | ||
return datetime.now() | ||
|
||
def _is_bucket_updated(self, current_objects: Set[str]) -> dict[str, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to keep it in triggers than in hooks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its used by GCSUploadSessionTrigger only if its was reused in other also would have moved to hooks
|
||
START_DATE = datetime(2022, 1, 1) | ||
START_DATE = datetime(2021, 1, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since START_DATE
is getting used once only we should directly use datetime(2021, 1, 1)
in line 51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed it
|
||
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "astronomer-airflow-providers") | ||
BUCKET_1 = "test-gcs-example-bucket" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add this also env variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added bucket in env
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# | |
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. |
This project is not maintained by ASF
Need to link PR to issue -> https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword You need to add the following in the PR description |
catchup=False, | ||
schedule_interval="@once", | ||
tags=["example"], | ||
schedule_interval='@once', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated change
- Implement the GCSPrefixBlobTrigger( - Implement the sensor GCSObjectWithPrefixExistenceSensor
- add example dag for GCSObjectsWithPrefixExistenceSensorAsync - modify trigger super.__init__
- Implement the GCSUploadSessionTrigger - Implement the sensor GCSUploadSessionCompleteSensorAsync
We don't use Pylint.
Well done, I made some changes @rajaths010494 , please check the last few commits and my comments -- do ask if you need any clarification |
Issue: #69
Implement the sensor GCSObjectsWithPrefixExistenceSensorAsync
Implement triggers for GCSObjectsWithPrefixExistenceSensorAsync
Add example DAG for GCSObjectsWithPrefixExistenceSensorAsync
Add test for GCSObjectsWithPrefixExistenceSensorAsync sensor, trigger and hooks
Implement the sensor GCSUploadSessionCompleteSensor
Implement triggers for GCSUploadSessionCompleteSensor
Add example DAG for GCSUploadSessionCompleteSensor
Add test for GCSUploadSessionCompleteSensor sensor, trigger and hooks