-
Notifications
You must be signed in to change notification settings - Fork 4.5k
#1313 source google ads: write less logs #21517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1313 source google ads: write less logs #21517
Conversation
/test connector=connectors/source-google-ads
Build FailedTest summary info:
|
/test connector=connectors/source-google-ads
Build PassedTest summary info:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions about some specifics of how we use the sieve, but overall the approach looks good.
airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py
Outdated
Show resolved
Hide resolved
@@ -154,6 +177,7 @@ def current_state(self, customer_id, default=None): | |||
|
|||
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]: | |||
for customer in self.customers: | |||
logger = cyclic_sieve(self.logger, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid initializing a new cyclic_sieve
since it should have already been defined on the stream as self.incremental_sieve_logger
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we can, but this was done on purpose. The idea was to filter out not just 9 out of 10 messages, but 9 out of 10 message cycles to have more consistent logs so an engineer shouldn't have to stick together messages from different routine calls / cycles / exceptions / if statements and so on when debugging. Since stream_slices()
is called once per stream before reading records, and read_records()
is called once per stream slice, these are two different log cycles.
If you still insist, I can reuse self.incremental_sieve_logger
here, technically it is not a problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, that makes sense and I agree that for a better experience we wouldn't want to share the same log cycles. Can you add a comment for that here just mentioning that's why a logger sieve is initialized here.
@@ -218,3 +219,14 @@ def test_retry_transient_errors(mocker, config, customers, error_cls): | |||
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice)) | |||
assert mocked_search.call_count == 5 | |||
assert records == [] | |||
|
|||
|
|||
def test_cyclic_sieve(caplog): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding tests for the new logger?
/test connector=connectors/source-google-ads
|
/test connector=connectors/source-google-ads
Build PassedTest summary info:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall no blocking comments, but had one discussion point about whether we want a new sieve per-read_records invocation if that made interpreting the logs more intuitive
@@ -154,6 +177,7 @@ def current_state(self, customer_id, default=None): | |||
|
|||
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]: | |||
for customer in self.customers: | |||
logger = cyclic_sieve(self.logger, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, that makes sense and I agree that for a better experience we wouldn't want to share the same log cycles. Can you add a comment for that here just mentioning that's why a logger sieve is initialized here.
@@ -189,7 +213,8 @@ def read_records( | |||
and update `start_date` key in the `stream_slice` with the latest read record's cursor value, then retry the sync. | |||
""" | |||
while True: | |||
self.logger.info("Starting a while loop iteration") | |||
self.incremental_sieve_logger.bump() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going off the prior comment about separate slice and read_records log cycles, do you think it's also worth initializing a new sieve every time read_records is invoked? If the sieve is shared on the stream, we will emit exactly 1/10 of logs, but it could be a little confusing why certain cycles are skipped on different read_records invocations if a prior one has a partially bumped count.
I'm open to either option and this is non-blocking on approval, but I think worth highlighting when the sieve is shared at this point? If we do decide to have a separate sieve per read_records
invocation, we could probably remove it from __init__()
and do something similar to slices where it is created within this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a tricky point cause normally a while loop here contains a single iteration. It only goes further if there were errors during the read. So I do not think there's much sense in initializing new sieve each time read_records
is invoked. On the other hand I agree that logs may become inconsistent if there are two or more while loop iterations, therefore I moved incremental_sieve_logger.bump()
outside the loop to remain consistent
/publish connector=connectors/source-google-ads
if you have connectors that successfully published but failed definition generation, follow step 4 here |
Airbyte Code Coverage
|
What
https://github.com/airbytehq/oncall/issues/1313
How
Filter out each 10th log record