-
Notifications
You must be signed in to change notification settings - Fork 4.5k
#1313 source google ads: write less logs #21517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
53784af
77c0608
02f4d86
7d8ca4e
544a465
997e326
641ceff
1d995cf
0b61eb3
8b06641
155fe03
8ee1fc5
9ae17d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import logging | ||
from abc import ABC | ||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple | ||
|
||
|
@@ -18,6 +19,24 @@ | |
from .models import Customer | ||
|
||
|
||
class cyclic_sieve: | ||
def __init__(self, logger: logging.Logger, fraction: int = 10): | ||
self._logger = logger | ||
self._cycle_counter = 0 | ||
self._fraction = fraction | ||
|
||
def __getattr__(self, item): | ||
if self._cycle_counter % self._fraction == 0: | ||
return getattr(self._logger, item) | ||
return self.stub | ||
|
||
def stub(self, *args, **kwargs): | ||
pass | ||
|
||
def bump(self): | ||
self._cycle_counter += 1 | ||
|
||
|
||
def parse_dates(stream_slice): | ||
start_date = pendulum.parse(stream_slice["start_date"]) | ||
end_date = pendulum.parse(stream_slice["end_date"]) | ||
|
@@ -91,6 +110,7 @@ class GoogleAdsStream(Stream, ABC): | |
def __init__(self, api: GoogleAds, customers: List[Customer]): | ||
self.google_ads_client = api | ||
self.customers = customers | ||
self.base_sieve_logger = cyclic_sieve(self.logger, 10) | ||
|
||
def get_query(self, stream_slice: Mapping[str, Any]) -> str: | ||
query = GoogleAds.convert_schema_into_query(schema=self.get_json_schema(), report_name=self.name) | ||
|
@@ -105,7 +125,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |
yield {"customer_id": customer.id} | ||
|
||
def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]: | ||
self.logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}") | ||
self.base_sieve_logger.bump() | ||
self.base_sieve_logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}") | ||
if stream_slice is None: | ||
return [] | ||
|
||
|
@@ -119,7 +140,7 @@ def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = No | |
raise | ||
for error in exc.failure.errors: | ||
if error.error_code.authorization_error == AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED: | ||
self.logger.error(error.message) | ||
self.base_sieve_logger.error(error.message) | ||
continue | ||
# log and ignore only CUSTOMER_NOT_ENABLED error, otherwise - raise further | ||
raise | ||
|
@@ -139,6 +160,7 @@ def __init__(self, start_date: str, conversion_window_days: int, end_date: str = | |
self._end_date = end_date | ||
self._state = {} | ||
super().__init__(**kwargs) | ||
self.incremental_sieve_logger = cyclic_sieve(self.logger, 10) | ||
|
||
@property | ||
def state(self) -> MutableMapping[str, Any]: | ||
|
@@ -154,6 +176,7 @@ def current_state(self, customer_id, default=None): | |
|
||
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]: | ||
for customer in self.customers: | ||
logger = cyclic_sieve(self.logger, 10) | ||
stream_state = stream_state or {} | ||
if stream_state.get(customer.id): | ||
start_date = stream_state[customer.id].get(self.cursor_field) or self._start_date | ||
|
@@ -165,7 +188,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |
start_date = self._start_date | ||
|
||
end_date = self._end_date | ||
self.logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}") | ||
logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}") | ||
|
||
for chunk in chunk_date_range( | ||
start_date=start_date, | ||
|
@@ -178,7 +201,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite | |
): | ||
if chunk: | ||
chunk["customer_id"] = customer.id | ||
self.logger.info(f"Next slice is {chunk}") | ||
logger.info(f"Next slice is {chunk}") | ||
logger.bump() | ||
yield chunk | ||
|
||
def read_records( | ||
|
@@ -189,7 +213,8 @@ def read_records( | |
and update `start_date` key in the `stream_slice` with the latest read record's cursor value, then retry the sync. | ||
""" | ||
while True: | ||
self.logger.info("Starting a while loop iteration") | ||
self.incremental_sieve_logger.bump() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Going off the prior comment about separate slice and read_records log cycles, do you think it's also worth initializing a new sieve every time read_records is invoked? If the sieve is shared on the stream, we will emit exactly 1/10 of logs, but it could be a little confusing why certain cycles are skipped on different read_records invocations if a prior one has a partially bumped count. I'm open to either option and this is non-blocking on approval, but I think worth highlighting when the sieve is shared at this point? If we do decide to have a separate sieve per There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a tricky point cause normally a while loop here contains a single iteration. It only goes further if there were errors during the read. So I do not think there's much sense in initializing new sieve each time |
||
self.incremental_sieve_logger.info("Starting a while loop iteration") | ||
customer_id = stream_slice and stream_slice["customer_id"] | ||
try: | ||
records = super().read_records(sync_mode, stream_slice=stream_slice) | ||
|
@@ -200,38 +225,40 @@ def read_records( | |
date_in_latest_record = pendulum.parse(record[self.cursor_field]) | ||
cursor_value = (max(date_in_current_stream, date_in_latest_record)).to_date_string() | ||
self.state = {customer_id: {self.cursor_field: cursor_value}} | ||
self.logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.") | ||
self.incremental_sieve_logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.") | ||
yield record | ||
continue | ||
self.state = {customer_id: {self.cursor_field: record[self.cursor_field]}} | ||
self.logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.") | ||
self.incremental_sieve_logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.") | ||
yield record | ||
continue | ||
except GoogleAdsException as exception: | ||
self.logger.info(f"Caught a GoogleAdsException: {str(exception)}") | ||
self.incremental_sieve_logger.info(f"Caught a GoogleAdsException: {str(exception)}") | ||
error = next(iter(exception.failure.errors)) | ||
if error.error_code.request_error == RequestErrorEnum.RequestError.EXPIRED_PAGE_TOKEN: | ||
start_date, end_date = parse_dates(stream_slice) | ||
current_state = self.current_state(customer_id) | ||
self.logger.info(f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}") | ||
self.incremental_sieve_logger.info( | ||
f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}" | ||
) | ||
if (end_date - start_date).days == 1: | ||
# If range days is 1, no need in retry, because it's the minimum date range | ||
self.logger.error("Page token has expired.") | ||
self.incremental_sieve_logger.error("Page token has expired.") | ||
raise exception | ||
elif current_state == stream_slice["start_date"]: | ||
# It couldn't read all the records within one day, it will enter an infinite loop, | ||
# so raise the error | ||
self.logger.error("Page token has expired.") | ||
self.incremental_sieve_logger.error("Page token has expired.") | ||
raise exception | ||
# Retry reading records from where it crushed | ||
stream_slice["start_date"] = self.current_state(customer_id, default=stream_slice["start_date"]) | ||
self.logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}") | ||
self.incremental_sieve_logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}") | ||
else: | ||
# raise caught error for other error statuses | ||
raise exception | ||
else: | ||
# return the control if no exception is raised | ||
self.logger.info("Current slice has been read. Exiting read_records()") | ||
self.incremental_sieve_logger.info("Current slice has been read. Exiting read_records()") | ||
return | ||
|
||
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import logging | ||
from unittest.mock import Mock | ||
|
||
import pytest | ||
|
@@ -12,7 +13,7 @@ | |
from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests | ||
from grpc import RpcError | ||
from source_google_ads.google_ads import GoogleAds | ||
from source_google_ads.streams import ClickView | ||
from source_google_ads.streams import ClickView, cyclic_sieve | ||
|
||
from .common import MockGoogleAdsClient as MockGoogleAdsClient | ||
|
||
|
@@ -218,3 +219,14 @@ def test_retry_transient_errors(mocker, config, customers, error_cls): | |
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice)) | ||
assert mocked_search.call_count == 5 | ||
assert records == [] | ||
|
||
|
||
def test_cyclic_sieve(caplog): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for adding tests for the new logger? |
||
original_logger = logging.getLogger("test") | ||
sieve = cyclic_sieve(original_logger, fraction=10) | ||
for _ in range(20): | ||
sieve.info("Ground Control to Major Tom") | ||
sieve.info("Your circuit's dead, there's something wrong") | ||
sieve.info("Can you hear me, Major Tom?") | ||
sieve.bump() | ||
assert len(caplog.records) == 6 # 20 * 3 / 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid initializing a new
cyclic_sieve
since it should have already been defined on the stream asself.incremental_sieve_logger
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we can, but this was done on purpose. The idea was to filter out not just 9 out of 10 messages, but 9 out of 10 message cycles to have more consistent logs so an engineer shouldn't have to stick together messages from different routine calls / cycles / exceptions / if statements and so on when debugging. Since
stream_slices()
is called once per stream before reading records, andread_records()
is called once per stream slice, these are two different log cycles.If you still insist, I can reuse
self.incremental_sieve_logger
here, technically it is not a problemThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, that makes sense and I agree that for a better experience we wouldn't want to share the same log cycles. Can you add a comment for that here just mentioning that's why a logger sieve is initialized here.