diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile index e3e6b791c953c..d03cf47f40a7b 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.0.3 +LABEL io.airbyte.version=0.0.4 LABEL io.airbyte.name=airbyte/source-google-analytics-data-api diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml index 3ceab93b64995..2808f1445293f 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml @@ -1,31 +1,39 @@ # See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests connector_image: airbyte/source-google-analytics-data-api:dev -tests: +acceptance_tests: spec: + tests: - spec_path: "source_google_analytics_data_api/spec.json" connection: - - config_path: "secrets/config.json" - status: "succeed" - - config_path: "integration_tests/invalid_config.json" - status: "failed" + tests: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" discovery: - - config_path: "secrets/config.json" - backward_compatibility_tests_config: - disable_for_version: "0.0.2" + tests: + - config_path: "secrets/config.json" basic_read: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - empty_streams: [] + tests: + - config_path: "secrets/config.json" + empty_streams: [] + incremental: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state: + future_state_path: "integration_tests/abnormal_state.json" full_refresh: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - ignored_fields: - "daily_active_users": ["uuid"] - "weekly_active_users": ["uuid"] - "four_weekly_active_users": ["uuid"] - "devices": ["uuid"] - "locations": ["uuid"] - "pages": ["uuid"] - "traffic_sources": ["uuid"] - "website_overview": ["uuid"] + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + ignored_fields: + "daily_active_users": ["uuid"] + "weekly_active_users": ["uuid"] + "four_weekly_active_users": ["uuid"] + "devices": ["uuid"] + "locations": ["uuid"] + "pages": ["uuid"] + "traffic_sources": ["uuid"] + "website_overview": ["uuid"] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..5902a0f1fec9a --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/abnormal_state.json @@ -0,0 +1,10 @@ +{ + "daily_active_users": {"date": "20501201"}, + "weekly_active_users": {"date": "20501201"}, + "four_weekly_active_users": {"date": "20501201"}, + "devices": {"date": "20501201"}, + "locations": {"date": "20501201"}, + "pages": {"date": "20501201"}, + "traffic_sources": {"date": "20501201"}, + "website_overview": {"date": "20501201"} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/sample_state.json new file mode 100644 index 0000000000000..3d058228c7bda --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/sample_state.json @@ -0,0 +1,10 @@ +{ + "daily_active_users": {"date": "20221201"}, + "weekly_active_users": {"date": "20221201"}, + "four_weekly_active_users": {"date": "20221201"}, + "devices": {"date": "20221201"}, + "locations": {"date": "20221201"}, + "pages": {"date": "20221201"}, + "traffic_sources": {"date": "20221201"}, + "website_overview": {"date": "20221201"} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py index 430697749452e..5193b5a295584 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py @@ -9,6 +9,7 @@ import pkgutil import uuid from abc import ABC +from operator import itemgetter from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import requests @@ -120,11 +121,16 @@ class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC): def __init__(self, config: Mapping[str, Any], *args, **kwargs): super().__init__(*args, **kwargs) self._config = config + self._raise_on_http_errors = True @property def config(self): return self._config + @property + def raise_on_http_errors(self): + return self._raise_on_http_errors + class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream): row_limit = 100000 @@ -145,7 +151,7 @@ def add_property_id(property_id): @staticmethod def add_dimensions(dimensions, row) -> dict: - return dict(zip(dimensions, [v["value"] for v in row["dimensionValues"]])) + return dict(zip(dimensions, [v["value"] for v in row.get("dimensionValues", [])])) @staticmethod def add_metrics(metrics, metric_types, row) -> dict: @@ -154,7 +160,7 @@ def _metric_type_to_python(metric_data: Tuple[str, str]) -> Any: python_type = metrics_type_to_python(metric_types[metric_name]) return metric_name, python_type(metric_value) - return dict(map(_metric_type_to_python, zip(metrics, [v["value"] for v in row["metricValues"]]))) + return dict(map(_metric_type_to_python, zip(metrics, [v["value"] for v in row.get("metricValues", [])]))) def get_json_schema(self) -> Mapping[str, Any]: """ @@ -213,41 +219,72 @@ def parse_response( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> Iterable[Mapping]: + if not response.ok: + return {} + r = response.json() - dimensions = [h["name"] for h in r["dimensionHeaders"]] - metrics = [h["name"] for h in r["metricHeaders"]] - metrics_type_map = {h["name"]: h["type"] for h in r["metricHeaders"]} + dimensions = [h["name"] for h in r.get("dimensionHeaders", [])] + metrics = [h["name"] for h in r.get("metricHeaders", [])] + metrics_type_map = {h["name"]: h["type"] for h in r.get("metricHeaders", [])} rows = [] for row in r.get("rows", []): - rows.append( - collections.ChainMap( - *[ - self.add_primary_key(), - self.add_property_id(self.config["property_id"]), - self.add_dimensions(dimensions, row), - self.add_metrics(metrics, metrics_type_map, row), - ] - ) + chain_row = collections.ChainMap( + *[ + self.add_primary_key(), + self.add_property_id(self.config["property_id"]), + self.add_dimensions(dimensions, row), + self.add_metrics(metrics, metrics_type_map, row), + ] ) + rows.append(dict(chain_row)) r["records"] = rows yield r + def should_retry(self, response: requests.Response) -> bool: + if response.status_code == 429: + self.logger.info(f"{response.json()['error']['message']}. " + f"More info: https://developers.google.com/analytics/devguides/reporting/data/v1/quotas") + self._raise_on_http_errors = False + return False + return super(GoogleAnalyticsDataApiBaseStream, self).should_retry(response) + + +class FullRefreshGoogleAnalyticsDataApi(GoogleAnalyticsDataApiBaseStream, ABC): + def request_body_json( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Optional[Mapping]: + start_date = utils.string_to_date(self.config["date_ranges_start_date"]) + end_date = datetime.datetime.now().date() + return { + "metrics": [{"name": m} for m in self.config["metrics"]], + "dimensions": [{"name": d} for d in self.config["dimensions"]], + "dateRanges": [ + { + "startDate": utils.date_to_string(start_date), + "endDate": utils.date_to_string(end_date), + } + ] + } + class IncrementalGoogleAnalyticsDataApiStream(GoogleAnalyticsDataApiBaseStream, IncrementalMixin, ABC): - _date_format = "%Y-%m-%d" + _date_format: str = "%Y-%m-%d" def __init__(self, *args, **kwargs): super(IncrementalGoogleAnalyticsDataApiStream, self).__init__(*args, **kwargs) - self._cursor_value = None + self._cursor_value: str = "" class GoogleAnalyticsDataApiGenericStream(IncrementalGoogleAnalyticsDataApiStream): - _default_window_in_days = 1 - _record_date_format = "%Y%m%d" + _default_window_in_days: int = 1 + _record_date_format: str = "%Y%m%d" @property def cursor_field(self) -> Union[str, List[str]]: @@ -255,11 +292,24 @@ def cursor_field(self) -> Union[str, List[str]]: @property def state(self) -> MutableMapping[str, Any]: - return {self.cursor_field: self._cursor_value or utils.string_to_date(self.config["date_ranges_start_date"], self._date_format)} + if self._cursor_value: + return {self.cursor_field: self._cursor_value} + return { + self.cursor_field: utils.date_to_string(self._date_parse_probe(self.config["date_ranges_start_date"]), self._record_date_format) + } @state.setter - def state(self, value): - self._cursor_value = utils.string_to_date(value[self.cursor_field], self._date_format) + datetime.timedelta(days=1) + def state(self, value: dict): + self._cursor_value = utils.date_to_string( + self._date_parse_probe(value[self.cursor_field]) + datetime.timedelta(days=1), + self._record_date_format + ) + + def _date_parse_probe(self, date_string: str) -> datetime.date: + try: + return utils.string_to_date(date_string, self._record_date_format) + except ValueError: + return utils.string_to_date(date_string, self._date_format) def request_body_json( self, @@ -270,7 +320,7 @@ def request_body_json( return { "metrics": [{"name": m} for m in self.config["metrics"]], "dimensions": [{"name": d} for d in self.config["dimensions"]], - "dateRanges": [stream_slice], + "dateRanges": [stream_slice] } def read_records( @@ -284,9 +334,8 @@ def read_records( return [] records = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) for record in records: - for row in record["records"]: - next_cursor_value = utils.string_to_date(row[self.cursor_field], self._record_date_format) - self._cursor_value = max(self._cursor_value, next_cursor_value) if self._cursor_value else next_cursor_value + for row in record.get("records", []): + self._cursor_value: str = max(self._cursor_value, row[self.cursor_field]) if self._cursor_value else row[self.cursor_field] yield row def stream_slices( @@ -294,10 +343,10 @@ def stream_slices( ) -> Iterable[Optional[Mapping[str, Any]]]: dates = [] - today: datetime.date = datetime.date.today() - start_date: datetime.date = self.state[self.cursor_field] + today: datetime.date = datetime.date.today() + start_date: datetime.date = utils.string_to_date(self.state[self.cursor_field], self._record_date_format) - timedelta: int = self.config["window_in_days"] or self._default_window_in_days + timedelta: int = self.config.get("window_in_days", self._default_window_in_days) while start_date <= today: end_date: datetime.date = start_date + datetime.timedelta(days=timedelta) @@ -359,17 +408,87 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> return False, str(e) return True, None + def _validate_custom_reports(self, custom_reports, default_reports): + """ + Load, validate and return custom reports. Expect custom reports to be a json string of the following format: + [{"name": "", "dimensions": ["", ...], "metrics": ["", ...]}, ...] + :param custom_reports: custom reports to be validated + :return: custom reports + """ + # Custom report root object is of type list + if not isinstance(custom_reports, list): + raise TypeError(f"Expected Custom Reports to be a list of objects. Got {type(custom_reports)}.") + + # Report items are of type dict + incorrect_report_item_types = [(i, type(report)) for i, report in enumerate(custom_reports) if type(report) is not dict] + if any(incorrect_report_item_types): + raise TypeError("Expected Report item to be an object. " + ", ".join([f"Got {t} at report item {i}" for i, t in incorrect_report_item_types])) + + def validate_name(report: dict) -> Optional[str]: + """Report name is defined as a non-empty string. Returns key name if any problems""" + if not ( + "name" in report + and report["name"] + and type(report['name']) is str + ): + return "name" + + def validate_structure(report: dict) -> Optional[str]: + """Check that either `dimensions` or `metrics` present in report config. Returns an error string""" + if not (("dimensions" in report and report["dimensions"]) or ("metrics" in report and report["metrics"])): + return "non-empty `dimensions` or `metrics` must present" + + def validate_dimensions(report: dict) -> Optional[str]: + """Dimensions are defined as a list of strings. Returns key dimensions if any problems""" + if "dimensions" in report and report["dimensions"] and not ( + isinstance(report["dimensions"], list) + and all(type(d) is str and d for d in report["dimensions"]) + ): + return "dimensions" + + def validate_metrics(report: dict) -> Optional[str]: + """Metrics are defined as a list of strings. Returns key metrics if any problems""" + if "metrics" in report and report["metrics"] and not ( + isinstance(report["metrics"], list) + and all(type(m) is str and m for m in report["metrics"]) + ): + return "metrics" + + # Collect all invalid reports with their positions and invalid keys. Example: [(1, "name"), (3, "name", "metrics"), ...] + incorrect_report_item_fields = [ + (i, *filter(lambda x: x, (validate_name(report), validate_structure(report), validate_dimensions(report), validate_metrics(report)))) + for i, report in enumerate(custom_reports) + if any([validate_name(report), validate_structure(report), validate_dimensions(report), validate_metrics(report)]) + ] + # Raise an error if any invalid reports provided + if any(incorrect_report_item_fields): + msg = 'Report format: [{"name": "", "dimensions": ["", ...], "metrics": ["", ...]}, ...]' + errors = ", ".join([ + f"Check {missing_fields} at report item {position + 1}" + for position, *missing_fields in incorrect_report_item_fields + ]) + raise TypeError(f'{msg}.\n {errors}') + + # Check if custom report names unique + existing_names = set(map(itemgetter("name"), default_reports)).intersection(set(map(itemgetter("name"), custom_reports))) + if existing_names: + raise ValueError(f"Reports {existing_names} already exist as a default reports.") + + return custom_reports + def streams(self, config: Mapping[str, Any]) -> List[Stream]: authenticator = self.get_authenticator(config) reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json")) if "custom_reports" in config: - custom_reports = json.loads(config["custom_reports"]) + custom_reports = self._validate_custom_reports(json.loads(config["custom_reports"]), reports) reports += custom_reports + # Generate and instantiate a list of dynamically defined streams: [ type(, (Bases,), {attrs})(*args, **kwargs), ... ] + # Base stream is considered to be a FullRefresh if `date` not present in dimensions, otherwise it considered to be an Incremental return [ - type(report["name"], (GoogleAnalyticsDataApiGenericStream,), {})( - config=dict(**config, metrics=report["metrics"], dimensions=report["dimensions"]), authenticator=authenticator + type(report["name"], (GoogleAnalyticsDataApiGenericStream if "date" in report.get("dimensions", []) else FullRefreshGoogleAnalyticsDataApi,), {})( + config=dict(**config, metrics=report.get("metrics", []), dimensions=report.get("dimensions", [])), authenticator=authenticator ) for report in reports ] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json index 29856c48a7652..58a668e048b44 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json @@ -84,8 +84,10 @@ }, "date_ranges_start_date": { "type": "string", - "title": "Date Range Start Date", - "description": "The start date. One of the values Ndaysago, yesterday, today or in the format YYYY-MM-DD", + "format": "date", + "default": "2022-12-01", + "title": "Reports replication start date", + "description": "The start date from which to begin replicating report data. Any data generated before this date will not be replicated in reports. This is a UTC date in YYYY-MM-DD format.", "order": 2 }, "custom_reports": { diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_authenticator.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_authenticator.py new file mode 100644 index 0000000000000..356746e1d008a --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_authenticator.py @@ -0,0 +1,56 @@ +import datetime + +from source_google_analytics_data_api import utils +from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator + + +TEST_PRIVATE_KEY = """ +-----BEGIN RSA PRIVATE KEY----- +MIIBPAIBAAJBAIy64eoS8VCwNnu6+kcyvRc7w/Cw20fnZWcpftLIl33ZWdXl/Q+W +sEQkm2RRWO2R9CGC2bJRZYEbiAabuG4T1LkCAwEAAQJBAIbr5Qv1fUZOqu2VJb58 +9qz/r6ti49jcEGwHbH/JsPQFpXWTyKdonibIblLDB4AbZdR25zEM2k2G42OErWjJ +0wECIQD21o4uOMJtAe7GLvg3AeQrb4t9sWlCPuNNxnmKfn8PfQIhAJH0F7vSyD7D +UwRcMht0PU65zWUZArBaI7BlpmFhgNbtAiEA1rIZ6vQtkDjtIW37MYUwm+Milgo4 +vokKljx6vM5339UCIEb1Owyvn3cUEypNgHbkfmHl5zu9exct26gI42j4tGDJAiEA +3uFuRTUY3W5s8hxyHcATsGhWfGa4VCSjlKE+sSchksc= +-----END RSA PRIVATE KEY----- +""" + + +def test_authenticator(mocker): + requests = mocker.MagicMock() + requests.request.return_value.json.side_effect = [ + { + "expires_in": GoogleServiceKeyAuthenticator._default_token_lifetime_secs, + "access_token": "ga-access-token-1" + }, + { + "expires_in": GoogleServiceKeyAuthenticator._default_token_lifetime_secs, + "access_token": "ga-access-token-2" + } + ] + + mocker.patch("source_google_analytics_data_api.authenticator.requests", requests) + + authenticator = GoogleServiceKeyAuthenticator(credentials={ + "client_email": "example-app@airbyte.com", + "private_key": TEST_PRIVATE_KEY, + "client_id": "c-airbyte-001" + }) + + request_object = mocker.MagicMock() + request_object.headers = {} + + authenticator(request_object) + assert requests.request.call_count == 1 + assert request_object.headers["Authorization"] == f"Bearer ga-access-token-1" + + authenticator(request_object) + assert requests.request.call_count == 1 + assert request_object.headers["Authorization"] == f"Bearer ga-access-token-1" + + authenticator._token["expires_at"] = utils.datetime_to_secs(datetime.datetime.utcnow()) - GoogleServiceKeyAuthenticator._default_token_lifetime_secs + + authenticator(request_object) + assert requests.request.call_count == 2 + assert request_object.headers["Authorization"] == f"Bearer ga-access-token-2" diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py index e872259c5fc29..da8c2cb35a4f6 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py @@ -4,13 +4,17 @@ import copy import datetime +import json import random from http import HTTPStatus from typing import Any, Mapping from unittest.mock import MagicMock import pytest -from source_google_analytics_data_api.source import GoogleAnalyticsDataApiGenericStream +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.http import HttpStream + +from source_google_analytics_data_api.source import GoogleAnalyticsDataApiGenericStream, GoogleAnalyticsDataApiTestConnectionStream json_credentials = """ { @@ -27,6 +31,159 @@ } """ +json_metadata_response = """ +{ + "dimensions": [ + { + "apiName": "browser", + "uiName": "Browser", + "description": "The browsers used to view your website.", + "category": "Platform / Device" + }, + { + "apiName": "city", + "uiName": "City", + "description": "The city from which the user activity originated.", + "category": "Geography" + }, + { + "apiName": "country", + "uiName": "Country", + "description": "The country from which the user activity originated.", + "category": "Geography" + }, + { + "apiName": "date", + "uiName": "Date", + "description": "The date of the event, formatted as YYYYMMDD.", + "category": "Time" + }, + { + "apiName": "deviceCategory", + "uiName": "Device category", + "description": "The type of device: Desktop, Tablet, or Mobile.", + "category": "Platform / Device" + }, + { + "apiName": "hostName", + "uiName": "Hostname", + "description": "Includes the subdomain and domain names of a URL; for example, the Host Name of www.example.com/contact.html is www.example.com.", + "category": "Page / Screen" + }, + { + "apiName": "operatingSystem", + "uiName": "Operating system", + "description": "The operating systems used by visitors to your app or website. Includes desktop and mobile operating systems such as Windows and Android.", + "category": "Platform / Device" + }, + { + "apiName": "pagePathPlusQueryString", + "uiName": "Page path + query string", + "description": "The portion of the URL following the hostname for web pages visited; for example, the pagePathPlusQueryString portion of https://www.example.com/store/contact-us?query_string=true is /store/contact-us?query_string=true.", + "category": "Page / Screen" + }, + { + "apiName": "region", + "uiName": "Region", + "description": "The geographic region from which the user activity originated, derived from their IP address.", + "category": "Geography" + }, + { + "apiName": "sessionMedium", + "uiName": "Session medium", + "description": "The medium that initiated a session on your website or app.", + "category": "Traffic Source" + }, + { + "apiName": "sessionSource", + "uiName": "Session source", + "description": "The source that initiated a session on your website or app.", + "category": "Traffic Source" + } + ], + "metrics": [ + { + "apiName": "active1DayUsers", + "uiName": "1-day active users", + "description": "The number of distinct active users on your site or app within a 1 day period. The 1 day period includes the last day in the report's date range. Note: this is the same as Active Users.", + "type": "TYPE_INTEGER", + "category": "User" + }, + { + "apiName": "active28DayUsers", + "uiName": "28-day active users", + "description": "The number of distinct active users on your site or app within a 28 day period. The 28 day period includes the last day in the report's date range.", + "type": "TYPE_INTEGER", + "category": "User" + }, + { + "apiName": "active7DayUsers", + "uiName": "7-day active users", + "description": "The number of distinct active users on your site or app within a 7 day period. The 7 day period includes the last day in the report's date range.", + "type": "TYPE_INTEGER", + "category": "User" + }, + { + "apiName": "averageSessionDuration", + "uiName": "Average session duration", + "description": "The average duration (in seconds) of users' sessions.", + "type": "TYPE_SECONDS", + "category": "Session" + }, + { + "apiName": "bounceRate", + "uiName": "Bounce rate", + "description": "The percentage of sessions that were not engaged ((Sessions Minus Engaged sessions) divided by Sessions). This metric is returned as a fraction; for example, 0.2761 means 27.61% of sessions were bounces.", + "type": "TYPE_FLOAT", + "category": "Session" + }, + { + "apiName": "newUsers", + "uiName": "New users", + "description": "The number of users who interacted with your site or launched your app for the first time (event triggered: first_open or first_visit).", + "type": "TYPE_INTEGER", + "category": "User" + }, + { + "apiName": "screenPageViews", + "uiName": "Views", + "description": "The number of app screens or web pages your users viewed. Repeated views of a single page or screen are counted. (screen_view + page_view events).", + "type": "TYPE_INTEGER", + "category": "Page / Screen" + }, + { + "apiName": "screenPageViewsPerSession", + "uiName": "Views per session", + "description": "The number of app screens or web pages your users viewed per session. Repeated views of a single page or screen are counted. (screen_view + page_view events) / sessions.", + "type": "TYPE_FLOAT", + "category": "Page / Screen" + }, + { + "apiName": "sessions", + "uiName": "Sessions", + "description": "The number of sessions that began on your site or app (event triggered: session_start).", + "type": "TYPE_INTEGER", + "category": "Session" + }, + { + "apiName": "sessionsPerUser", + "uiName": "Sessions per user", + "description": "The average number of sessions per user (Sessions divided by Active Users).", + "type": "TYPE_FLOAT", + "category": "Session" + }, + { + "apiName": "totalUsers", + "uiName": "Total users", + "description": "The number of distinct users who have logged at least one event, regardless of whether the site or app was in use when that event was logged.", + "type": "TYPE_INTEGER", + "category": "User" + } + ], + "name": "properties/496180525/metadata" +} +""" + @pytest.fixture def patch_base_class(mocker): @@ -50,7 +207,8 @@ def patch_base_class(mocker): "screenPageViewsPerSession", "bounceRate", ], - "date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"), + "date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=3)), "%Y-%m-%d"), + "window_in_days": 1 } } @@ -239,6 +397,100 @@ def test_parse_response(patch_base_class): assert actual_records == expected_data +def test_metadata_retrieved_once(patch_base_class, mocker): + read_records_mock = mocker.MagicMock() + read_records_mock.return_value = [json.loads(json_metadata_response)] + mocker.patch.object(GoogleAnalyticsDataApiTestConnectionStream, "read_records", read_records_mock) + + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + metadata_one = stream.metadata + metadata_two = stream.metadata + + assert metadata_one is metadata_two + assert read_records_mock.call_count == 1 + + +def test_json_schema(patch_base_class, mocker): + read_records_mock = mocker.MagicMock() + read_records_mock.return_value = [json.loads(json_metadata_response)] + mocker.patch.object(GoogleAnalyticsDataApiTestConnectionStream, "read_records", read_records_mock) + + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + json_schema_properties = stream.get_json_schema().get("properties") + metadata = json.loads(json_metadata_response) + + metrics_metadata = {m["apiName"]: m for m in metadata["metrics"]} + for metric in stream.config["metrics"]: + assert metric in json_schema_properties + assert metrics_metadata[metric]["description"] == json_schema_properties[metric]["description"] + + dimensions_metadata = {d["apiName"]: d for d in metadata["dimensions"]} + for dimension in stream.config["dimensions"]: + assert dimension in json_schema_properties + assert dimensions_metadata[dimension]["description"] == json_schema_properties[dimension]["description"] + + additional_properties = ["property_id", "uuid"] + for additional_property in additional_properties: + assert additional_property in json_schema_properties + + assert len(stream.config["dimensions"]) + len(stream.config["metrics"]) + len(additional_properties) == len(json_schema_properties) + + +def test_stream_slices(patch_base_class): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream_slices = stream.stream_slices(sync_mode=SyncMode.incremental) + assert len(stream_slices) == 2 + + latest_date = datetime.date.today() + formatted_date_diff = lambda date, days: (date - datetime.timedelta(days=days)).strftime("%Y-%m-%d") + assert stream_slices[0] == {"startDate": formatted_date_diff(latest_date, 3), "endDate": formatted_date_diff(latest_date, 2)} + assert stream_slices[1] == {"startDate": formatted_date_diff(latest_date, 1), "endDate": formatted_date_diff(latest_date, 0)} + + +def test_read_records(patch_base_class, mocker): + stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + initial_date = datetime.datetime.strptime(patch_base_class["config"]["date_ranges_start_date"], "%Y-%m-%d") + formatted_date_diff = lambda date, days: (date + datetime.timedelta(days=days)).strftime("%Y%m%d") + + read_records = mocker.MagicMock() + expected_records = [ + [ + { + "records": [ + {"date": formatted_date_diff(initial_date, 0)}, + {"date": formatted_date_diff(initial_date, 1)}, + ] + } + ], + [ + { + "records": [ + {"date": formatted_date_diff(initial_date, 2)}, + {"date": formatted_date_diff(initial_date, 3)}, + ] + } + ] + ] + read_records.side_effect = expected_records + mocker.patch.object(HttpStream, "read_records", read_records) + + reader = stream.read_records( + sync_mode=SyncMode.incremental, + stream_slice={"startDate": formatted_date_diff(initial_date, 0), "endDate": formatted_date_diff(initial_date, 1)} + ) + assert stream._cursor_value == "" + for i, _ in enumerate(reader): + assert stream._cursor_value == formatted_date_diff(initial_date, i) + + reader = stream.read_records( + sync_mode=SyncMode.incremental, + stream_slice={"startDate": formatted_date_diff(initial_date, 2), "endDate": formatted_date_diff(initial_date, 3)} + ) + assert stream._cursor_value == formatted_date_diff(initial_date, 1) + for j, _ in enumerate(reader): + assert stream._cursor_value == formatted_date_diff(initial_date, i + 1 + j) + + def test_request_headers(patch_base_class): stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} @@ -257,7 +509,7 @@ def test_http_method(patch_base_class): [ (HTTPStatus.OK, False), (HTTPStatus.BAD_REQUEST, False), - (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.TOO_MANY_REQUESTS, False), (HTTPStatus.INTERNAL_SERVER_ERROR, True), ], )