Skip to content

#1313 source google ads: write less logs #21517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ COPY main.py ./

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/source-google-ads

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple

Expand All @@ -18,6 +19,24 @@
from .models import Customer


class cyclic_sieve:
def __init__(self, logger: logging.Logger, fraction: int = 10):
self._logger = logger
self._cycle_counter = 0
self._fraction = fraction

def __getattr__(self, item):
if self._cycle_counter % self._fraction == 0:
return getattr(self._logger, item)
return self.stub

def stub(self, *args, **kwargs):
pass

def bump(self):
self._cycle_counter += 1


def parse_dates(stream_slice):
start_date = pendulum.parse(stream_slice["start_date"])
end_date = pendulum.parse(stream_slice["end_date"])
Expand Down Expand Up @@ -91,6 +110,7 @@ class GoogleAdsStream(Stream, ABC):
def __init__(self, api: GoogleAds, customers: List[Customer]):
self.google_ads_client = api
self.customers = customers
self.base_sieve_logger = cyclic_sieve(self.logger, 10)

def get_query(self, stream_slice: Mapping[str, Any]) -> str:
query = GoogleAds.convert_schema_into_query(schema=self.get_json_schema(), report_name=self.name)
Expand All @@ -105,7 +125,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
yield {"customer_id": customer.id}

def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
self.logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}")
self.base_sieve_logger.bump()
self.base_sieve_logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}")
if stream_slice is None:
return []

Expand All @@ -119,7 +140,7 @@ def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = No
raise
for error in exc.failure.errors:
if error.error_code.authorization_error == AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED:
self.logger.error(error.message)
self.base_sieve_logger.error(error.message)
continue
# log and ignore only CUSTOMER_NOT_ENABLED error, otherwise - raise further
raise
Expand All @@ -139,6 +160,7 @@ def __init__(self, start_date: str, conversion_window_days: int, end_date: str =
self._end_date = end_date
self._state = {}
super().__init__(**kwargs)
self.incremental_sieve_logger = cyclic_sieve(self.logger, 10)

@property
def state(self) -> MutableMapping[str, Any]:
Expand All @@ -154,6 +176,7 @@ def current_state(self, customer_id, default=None):

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
for customer in self.customers:
logger = cyclic_sieve(self.logger, 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid initializing a new cyclic_sieve since it should have already been defined on the stream as self.incremental_sieve_logger?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we can, but this was done on purpose. The idea was to filter out not just 9 out of 10 messages, but 9 out of 10 message cycles to have more consistent logs so an engineer shouldn't have to stick together messages from different routine calls / cycles / exceptions / if statements and so on when debugging. Since stream_slices() is called once per stream before reading records, and read_records() is called once per stream slice, these are two different log cycles.
If you still insist, I can reuse self.incremental_sieve_logger here, technically it is not a problem

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it, that makes sense and I agree that for a better experience we wouldn't want to share the same log cycles. Can you add a comment for that here just mentioning that's why a logger sieve is initialized here.

stream_state = stream_state or {}
if stream_state.get(customer.id):
start_date = stream_state[customer.id].get(self.cursor_field) or self._start_date
Expand All @@ -165,7 +188,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
start_date = self._start_date

end_date = self._end_date
self.logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}")
logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}")

for chunk in chunk_date_range(
start_date=start_date,
Expand All @@ -178,7 +201,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
):
if chunk:
chunk["customer_id"] = customer.id
self.logger.info(f"Next slice is {chunk}")
logger.info(f"Next slice is {chunk}")
logger.bump()
yield chunk

def read_records(
Expand All @@ -189,7 +213,8 @@ def read_records(
and update `start_date` key in the `stream_slice` with the latest read record's cursor value, then retry the sync.
"""
while True:
self.logger.info("Starting a while loop iteration")
self.incremental_sieve_logger.bump()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going off the prior comment about separate slice and read_records log cycles, do you think it's also worth initializing a new sieve every time read_records is invoked? If the sieve is shared on the stream, we will emit exactly 1/10 of logs, but it could be a little confusing why certain cycles are skipped on different read_records invocations if a prior one has a partially bumped count.

I'm open to either option and this is non-blocking on approval, but I think worth highlighting when the sieve is shared at this point? If we do decide to have a separate sieve per read_records invocation, we could probably remove it from __init__() and do something similar to slices where it is created within this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a tricky point cause normally a while loop here contains a single iteration. It only goes further if there were errors during the read. So I do not think there's much sense in initializing new sieve each time read_records is invoked. On the other hand I agree that logs may become inconsistent if there are two or more while loop iterations, therefore I moved incremental_sieve_logger.bump() outside the loop to remain consistent

self.incremental_sieve_logger.info("Starting a while loop iteration")
customer_id = stream_slice and stream_slice["customer_id"]
try:
records = super().read_records(sync_mode, stream_slice=stream_slice)
Expand All @@ -200,38 +225,40 @@ def read_records(
date_in_latest_record = pendulum.parse(record[self.cursor_field])
cursor_value = (max(date_in_current_stream, date_in_latest_record)).to_date_string()
self.state = {customer_id: {self.cursor_field: cursor_value}}
self.logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.")
self.incremental_sieve_logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.")
yield record
continue
self.state = {customer_id: {self.cursor_field: record[self.cursor_field]}}
self.logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.")
self.incremental_sieve_logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.")
yield record
continue
except GoogleAdsException as exception:
self.logger.info(f"Caught a GoogleAdsException: {str(exception)}")
self.incremental_sieve_logger.info(f"Caught a GoogleAdsException: {str(exception)}")
error = next(iter(exception.failure.errors))
if error.error_code.request_error == RequestErrorEnum.RequestError.EXPIRED_PAGE_TOKEN:
start_date, end_date = parse_dates(stream_slice)
current_state = self.current_state(customer_id)
self.logger.info(f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}")
self.incremental_sieve_logger.info(
f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}"
)
if (end_date - start_date).days == 1:
# If range days is 1, no need in retry, because it's the minimum date range
self.logger.error("Page token has expired.")
self.incremental_sieve_logger.error("Page token has expired.")
raise exception
elif current_state == stream_slice["start_date"]:
# It couldn't read all the records within one day, it will enter an infinite loop,
# so raise the error
self.logger.error("Page token has expired.")
self.incremental_sieve_logger.error("Page token has expired.")
raise exception
# Retry reading records from where it crushed
stream_slice["start_date"] = self.current_state(customer_id, default=stream_slice["start_date"])
self.logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}")
self.incremental_sieve_logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}")
else:
# raise caught error for other error statuses
raise exception
else:
# return the control if no exception is raised
self.logger.info("Current slice has been read. Exiting read_records()")
self.incremental_sieve_logger.info("Current slice has been read. Exiting read_records()")
return

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,15 @@ def test_invalid_custom_query_handled(mocked_gads_api, config):


@pytest.mark.parametrize(
("cls", "error", "failure_code", "raise_expected", "log_expected"),
("cls", "error", "failure_code", "raise_expected"),
(
(AdGroupLabels, "authorization_error", AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED, False, True),
(AdGroupLabels, "internal_error", 1, True, False),
(ServiceAccounts, "authentication_error", 1, True, False),
(ServiceAccounts, "internal_error", 1, True, False),
(AdGroupLabels, "authorization_error", AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED, False),
(AdGroupLabels, "internal_error", 1, True),
(ServiceAccounts, "authentication_error", 1, True),
(ServiceAccounts, "internal_error", 1, True),
),
)
def test_read_record_error_handling(config, customers, caplog, mocked_gads_api, cls, error, failure_code, raise_expected, log_expected):
def test_read_record_error_handling(config, customers, caplog, mocked_gads_api, cls, error, failure_code, raise_expected):
error_msg = "Some unexpected error"
mocked_gads_api(failure_code=failure_code, failure_msg=error_msg, error_type=error)
google_api = GoogleAds(credentials=config["credentials"])
Expand All @@ -546,8 +546,6 @@ def test_read_record_error_handling(config, customers, caplog, mocked_gads_api,
else:
for _ in stream.read_records(sync_mode=Mock(), stream_slice={"customer_id": "1234567890"}):
pass
error_in_log = error_msg in caplog.text
assert error_in_log is log_expected


def test_stream_slices(config, customers):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from unittest.mock import Mock

import pytest
Expand All @@ -12,7 +13,7 @@
from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests
from grpc import RpcError
from source_google_ads.google_ads import GoogleAds
from source_google_ads.streams import ClickView
from source_google_ads.streams import ClickView, cyclic_sieve

from .common import MockGoogleAdsClient as MockGoogleAdsClient

Expand Down Expand Up @@ -218,3 +219,14 @@ def test_retry_transient_errors(mocker, config, customers, error_cls):
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice))
assert mocked_search.call_count == 5
assert records == []


def test_cyclic_sieve(caplog):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding tests for the new logger?

original_logger = logging.getLogger("test")
sieve = cyclic_sieve(original_logger, fraction=10)
for _ in range(20):
sieve.info("Ground Control to Major Tom")
sieve.info("Your circuit's dead, there's something wrong")
sieve.info("Can you hear me, Major Tom?")
sieve.bump()
assert len(caplog.records) == 6 # 20 * 3 / 10
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `0.2.8` | 2023-01-18 | [21517](https://github.com/airbytehq/airbyte/pull/21517) | Write fewer logs |
| `0.2.7` | 2023-01-10 | [20755](https://github.com/airbytehq/airbyte/pull/20755) | Add more logs to debug stuck syncs |
| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors |
| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream |
Expand Down