Skip to content

Commit f8ec4d4

Browse files
committed
Fix pending issues
1 parent 4750c72 commit f8ec4d4

File tree

2 files changed

+70
-10
lines changed
  • astronomer/providers/google/cloud

2 files changed

+70
-10
lines changed

astronomer/providers/google/cloud/sensors/gcs.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,30 @@ def execute_complete(self, context: "Context", event=None) -> None:
9292

9393

9494
class GCSObjectsWithPrefixExistenceSensorAsync(GCSObjectsWithPrefixExistenceSensor):
95+
"""
96+
Async Operator that Checks for the existence of GCS objects at a given prefix, passing matches via XCom.
97+
98+
When files matching the given prefix are found, the poke method's criteria will be
99+
fulfilled and the matching objects will be returned from the operator and passed
100+
through XCom for downstream tasks.
101+
102+
:param bucket: The Google Cloud Storage bucket where the object is.
103+
:param prefix: The name of the prefix to check in the Google cloud storage bucket.
104+
:param google_cloud_conn_id: The connection ID to use when connecting to Google Cloud Storage.
105+
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
106+
if any. For this to work, the service account making the request must have
107+
domain-wide delegation enabled.
108+
:param impersonation_chain: Optional service account to impersonate using short-term
109+
credentials, or chained list of accounts required to get the access_token
110+
of the last account in the list, which will be impersonated in the request.
111+
If set as a string, the account must grant the originating account
112+
the Service Account Token Creator IAM role.
113+
If set as a sequence, the identities from the list must grant
114+
Service Account Token Creator IAM role to the directly preceding identity, with first
115+
account from the list granting this role to the originating account (templated).
116+
:param polling_interval: The interval in seconds to wait between checks for matching objects.
117+
"""
118+
95119
def __init__(
96120
self,
97121
polling_interval: float = 5.0,
@@ -126,6 +150,41 @@ def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] =
126150

127151

128152
class GCSUploadSessionCompleteSensorAsync(GCSUploadSessionCompleteSensor):
153+
"""
154+
Checks for changes in the number of objects at prefix in Google Cloud Storage
155+
bucket and returns True if the inactivity period has passed with no
156+
increase in the number of objects. Note, this sensor will no behave correctly
157+
in reschedule mode, as the state of the listed objects in the GCS bucket will
158+
be lost between rescheduled invocations.
159+
160+
:param bucket: The Google Cloud Storage bucket where the objects are expected.
161+
:param prefix: The name of the prefix to check in the Google cloud storage bucket.
162+
:param inactivity_period: The total seconds of inactivity to designate
163+
an upload session is over. Note, this mechanism is not real time and
164+
this operator may not return until a poke_interval after this period
165+
has passed with no additional objects sensed.
166+
:param min_objects: The minimum number of objects needed for upload session
167+
to be considered valid.
168+
:param previous_objects: The set of object ids found during the last poke.
169+
:param allow_delete: Should this sensor consider objects being deleted
170+
between pokes valid behavior. If true a warning message will be logged
171+
when this happens. If false an error will be raised.
172+
:param google_cloud_conn_id: The connection ID to use when connecting
173+
to Google Cloud Storage.
174+
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
175+
if any. For this to work, the service account making the request must have
176+
domain-wide delegation enabled.
177+
:param impersonation_chain: Optional service account to impersonate using short-term
178+
credentials, or chained list of accounts required to get the access_token
179+
of the last account in the list, which will be impersonated in the request.
180+
If set as a string, the account must grant the originating account
181+
the Service Account Token Creator IAM role.
182+
If set as a sequence, the identities from the list must grant
183+
Service Account Token Creator IAM role to the directly preceding identity, with first
184+
account from the list granting this role to the originating account (templated).
185+
:param polling_interval: The interval in seconds to wait between checks for matching objects.
186+
"""
187+
129188
def __init__(
130189
self,
131190
polling_interval: float = 5.0,

astronomer/providers/google/cloud/triggers/gcs.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -317,15 +317,16 @@ def _is_bucket_updated(self, current_objects: Set[str]) -> Dict[str, str]:
317317

318318
if current_num_objects >= self.min_objects:
319319
success_message = (
320-
"SUCCESS: Sensor found %s objects at %s. Waited at least %s seconds, with no new objects dropped.",
321-
current_num_objects,
322-
path,
323-
self.inactivity_period,
320+
"SUCCESS: Sensor found %s objects at %s. Waited at least %s "
321+
"seconds, with no new objects dropped."
324322
)
325-
self.log.info(success_message)
326-
return {"status": "success", "message": success_message}
327-
328-
error_message = "FAILURE: Inactivity Period passed, not enough objects found in %s", path
329-
self.log.error(error_message)
330-
return {"status": "error", "message": error_message}
323+
self.log.info(success_message, current_num_objects, path, self.inactivity_seconds)
324+
return {
325+
"status": "success",
326+
"message": success_message % (current_num_objects, path, self.inactivity_seconds),
327+
}
328+
329+
error_message = "FAILURE: Inactivity Period passed, not enough objects found in %s"
330+
self.log.error(error_message, path)
331+
return {"status": "error", "message": error_message % path}
331332
return {"status": "pending"}

0 commit comments

Comments
 (0)