Skip to content

Source Google Analytics Data API: prepare connector to сertification #20769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.0.3
LABEL io.airbyte.version=0.0.4
LABEL io.airbyte.name=airbyte/source-google-analytics-data-api
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-google-analytics-data-api:dev
tests:
acceptance_tests:
spec:
tests:
- spec_path: "source_google_analytics_data_api/spec.json"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.0.2"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
tests:
- config_path: "secrets/config.json"
empty_streams: []
incremental:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state:
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
"daily_active_users": ["uuid"]
"weekly_active_users": ["uuid"]
"four_weekly_active_users": ["uuid"]
"devices": ["uuid"]
"locations": ["uuid"]
"pages": ["uuid"]
"traffic_sources": ["uuid"]
"website_overview": ["uuid"]
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
"daily_active_users": ["uuid"]
"weekly_active_users": ["uuid"]
"four_weekly_active_users": ["uuid"]
"devices": ["uuid"]
"locations": ["uuid"]
"pages": ["uuid"]
"traffic_sources": ["uuid"]
"website_overview": ["uuid"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"daily_active_users": {"date": "20501201"},
"weekly_active_users": {"date": "20501201"},
"four_weekly_active_users": {"date": "20501201"},
"devices": {"date": "20501201"},
"locations": {"date": "20501201"},
"pages": {"date": "20501201"},
"traffic_sources": {"date": "20501201"},
"website_overview": {"date": "20501201"}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"daily_active_users": {"date": "20221201"},
"weekly_active_users": {"date": "20221201"},
"four_weekly_active_users": {"date": "20221201"},
"devices": {"date": "20221201"},
"locations": {"date": "20221201"},
"pages": {"date": "20221201"},
"traffic_sources": {"date": "20221201"},
"website_overview": {"date": "20221201"}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pkgutil
import uuid
from abc import ABC
from operator import itemgetter
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import requests
Expand Down Expand Up @@ -120,11 +121,16 @@ class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC):
def __init__(self, config: Mapping[str, Any], *args, **kwargs):
super().__init__(*args, **kwargs)
self._config = config
self._raise_on_http_errors = True

@property
def config(self):
return self._config

@property
def raise_on_http_errors(self):
return self._raise_on_http_errors


class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream):
row_limit = 100000
Expand All @@ -145,7 +151,7 @@ def add_property_id(property_id):

@staticmethod
def add_dimensions(dimensions, row) -> dict:
return dict(zip(dimensions, [v["value"] for v in row["dimensionValues"]]))
return dict(zip(dimensions, [v["value"] for v in row.get("dimensionValues", [])]))

@staticmethod
def add_metrics(metrics, metric_types, row) -> dict:
Expand All @@ -154,7 +160,7 @@ def _metric_type_to_python(metric_data: Tuple[str, str]) -> Any:
python_type = metrics_type_to_python(metric_types[metric_name])
return metric_name, python_type(metric_value)

return dict(map(_metric_type_to_python, zip(metrics, [v["value"] for v in row["metricValues"]])))
return dict(map(_metric_type_to_python, zip(metrics, [v["value"] for v in row.get("metricValues", [])])))

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand Down Expand Up @@ -213,53 +219,97 @@ def parse_response(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
if not response.ok:
return {}

r = response.json()

dimensions = [h["name"] for h in r["dimensionHeaders"]]
metrics = [h["name"] for h in r["metricHeaders"]]
metrics_type_map = {h["name"]: h["type"] for h in r["metricHeaders"]}
dimensions = [h["name"] for h in r.get("dimensionHeaders", [])]
metrics = [h["name"] for h in r.get("metricHeaders", [])]
metrics_type_map = {h["name"]: h["type"] for h in r.get("metricHeaders", [])}

rows = []

for row in r.get("rows", []):
rows.append(
collections.ChainMap(
*[
self.add_primary_key(),
self.add_property_id(self.config["property_id"]),
self.add_dimensions(dimensions, row),
self.add_metrics(metrics, metrics_type_map, row),
]
)
chain_row = collections.ChainMap(
*[
self.add_primary_key(),
self.add_property_id(self.config["property_id"]),
self.add_dimensions(dimensions, row),
self.add_metrics(metrics, metrics_type_map, row),
]
)
rows.append(dict(chain_row))
r["records"] = rows

yield r

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 429:
self.logger.info(f"{response.json()['error']['message']}. "
f"More info: https://developers.google.com/analytics/devguides/reporting/data/v1/quotas")
self._raise_on_http_errors = False
return False
return super(GoogleAnalyticsDataApiBaseStream, self).should_retry(response)


class FullRefreshGoogleAnalyticsDataApi(GoogleAnalyticsDataApiBaseStream, ABC):
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
start_date = utils.string_to_date(self.config["date_ranges_start_date"])
end_date = datetime.datetime.now().date()
return {
"metrics": [{"name": m} for m in self.config["metrics"]],
"dimensions": [{"name": d} for d in self.config["dimensions"]],
"dateRanges": [
{
"startDate": utils.date_to_string(start_date),
"endDate": utils.date_to_string(end_date),
}
]
}


class IncrementalGoogleAnalyticsDataApiStream(GoogleAnalyticsDataApiBaseStream, IncrementalMixin, ABC):
_date_format = "%Y-%m-%d"
_date_format: str = "%Y-%m-%d"

def __init__(self, *args, **kwargs):
super(IncrementalGoogleAnalyticsDataApiStream, self).__init__(*args, **kwargs)
self._cursor_value = None
self._cursor_value: str = ""


class GoogleAnalyticsDataApiGenericStream(IncrementalGoogleAnalyticsDataApiStream):
_default_window_in_days = 1
_record_date_format = "%Y%m%d"
_default_window_in_days: int = 1
_record_date_format: str = "%Y%m%d"

@property
def cursor_field(self) -> Union[str, List[str]]:
return "date"

@property
def state(self) -> MutableMapping[str, Any]:
return {self.cursor_field: self._cursor_value or utils.string_to_date(self.config["date_ranges_start_date"], self._date_format)}
if self._cursor_value:
return {self.cursor_field: self._cursor_value}
return {
self.cursor_field: utils.date_to_string(self._date_parse_probe(self.config["date_ranges_start_date"]), self._record_date_format)
}

@state.setter
def state(self, value):
self._cursor_value = utils.string_to_date(value[self.cursor_field], self._date_format) + datetime.timedelta(days=1)
def state(self, value: dict):
self._cursor_value = utils.date_to_string(
self._date_parse_probe(value[self.cursor_field]) + datetime.timedelta(days=1),
self._record_date_format
)

def _date_parse_probe(self, date_string: str) -> datetime.date:
try:
return utils.string_to_date(date_string, self._record_date_format)
except ValueError:
return utils.string_to_date(date_string, self._date_format)

def request_body_json(
self,
Expand All @@ -270,7 +320,7 @@ def request_body_json(
return {
"metrics": [{"name": m} for m in self.config["metrics"]],
"dimensions": [{"name": d} for d in self.config["dimensions"]],
"dateRanges": [stream_slice],
"dateRanges": [stream_slice]
}

def read_records(
Expand All @@ -284,20 +334,19 @@ def read_records(
return []
records = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
for record in records:
for row in record["records"]:
next_cursor_value = utils.string_to_date(row[self.cursor_field], self._record_date_format)
self._cursor_value = max(self._cursor_value, next_cursor_value) if self._cursor_value else next_cursor_value
for row in record.get("records", []):
self._cursor_value: str = max(self._cursor_value, row[self.cursor_field]) if self._cursor_value else row[self.cursor_field]
yield row

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
dates = []

today: datetime.date = datetime.date.today()
start_date: datetime.date = self.state[self.cursor_field]
today: datetime.date = datetime.date.today()
start_date: datetime.date = utils.string_to_date(self.state[self.cursor_field], self._record_date_format)

timedelta: int = self.config["window_in_days"] or self._default_window_in_days
timedelta: int = self.config.get("window_in_days", self._default_window_in_days)

while start_date <= today:
end_date: datetime.date = start_date + datetime.timedelta(days=timedelta)
Expand Down Expand Up @@ -359,17 +408,87 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
return False, str(e)
return True, None

def _validate_custom_reports(self, custom_reports, default_reports):
"""
Load, validate and return custom reports. Expect custom reports to be a json string of the following format:
[{"name": "<report name>", "dimensions": ["<dimension-name>", ...], "metrics": ["<metric-name>", ...]}, ...]
:param custom_reports: custom reports to be validated
:return: custom reports
"""
# Custom report root object is of type list
if not isinstance(custom_reports, list):
raise TypeError(f"Expected Custom Reports to be a list of objects. Got {type(custom_reports)}.")

# Report items are of type dict
incorrect_report_item_types = [(i, type(report)) for i, report in enumerate(custom_reports) if type(report) is not dict]
if any(incorrect_report_item_types):
raise TypeError("Expected Report item to be an object. " + ", ".join([f"Got {t} at report item {i}" for i, t in incorrect_report_item_types]))

def validate_name(report: dict) -> Optional[str]:
"""Report name is defined as a non-empty string. Returns key name if any problems"""
if not (
"name" in report
and report["name"]
and type(report['name']) is str
):
return "name"

def validate_structure(report: dict) -> Optional[str]:
"""Check that either `dimensions` or `metrics` present in report config. Returns an error string"""
if not (("dimensions" in report and report["dimensions"]) or ("metrics" in report and report["metrics"])):
return "non-empty `dimensions` or `metrics` must present"

def validate_dimensions(report: dict) -> Optional[str]:
"""Dimensions are defined as a list of strings. Returns key dimensions if any problems"""
if "dimensions" in report and report["dimensions"] and not (
isinstance(report["dimensions"], list)
and all(type(d) is str and d for d in report["dimensions"])
):
return "dimensions"

def validate_metrics(report: dict) -> Optional[str]:
"""Metrics are defined as a list of strings. Returns key metrics if any problems"""
if "metrics" in report and report["metrics"] and not (
isinstance(report["metrics"], list)
and all(type(m) is str and m for m in report["metrics"])
):
return "metrics"

# Collect all invalid reports with their positions and invalid keys. Example: [(1, "name"), (3, "name", "metrics"), ...]
incorrect_report_item_fields = [
(i, *filter(lambda x: x, (validate_name(report), validate_structure(report), validate_dimensions(report), validate_metrics(report))))
for i, report in enumerate(custom_reports)
if any([validate_name(report), validate_structure(report), validate_dimensions(report), validate_metrics(report)])
]
# Raise an error if any invalid reports provided
if any(incorrect_report_item_fields):
msg = 'Report format: [{"name": "<report name>", "dimensions": ["<dimension-name>", ...], "metrics": ["<metric-name>", ...]}, ...]'
errors = ", ".join([
f"Check {missing_fields} at report item {position + 1}"
for position, *missing_fields in incorrect_report_item_fields
])
raise TypeError(f'{msg}.\n {errors}')

# Check if custom report names unique
existing_names = set(map(itemgetter("name"), default_reports)).intersection(set(map(itemgetter("name"), custom_reports)))
if existing_names:
raise ValueError(f"Reports {existing_names} already exist as a default reports.")

return custom_reports

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self.get_authenticator(config)

reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
if "custom_reports" in config:
custom_reports = json.loads(config["custom_reports"])
custom_reports = self._validate_custom_reports(json.loads(config["custom_reports"]), reports)
reports += custom_reports

# Generate and instantiate a list of dynamically defined streams: [ type(<name>, (Bases,), {attrs})(*args, **kwargs), ... ]
# Base stream is considered to be a FullRefresh if `date` not present in dimensions, otherwise it considered to be an Incremental
return [
type(report["name"], (GoogleAnalyticsDataApiGenericStream,), {})(
config=dict(**config, metrics=report["metrics"], dimensions=report["dimensions"]), authenticator=authenticator
type(report["name"], (GoogleAnalyticsDataApiGenericStream if "date" in report.get("dimensions", []) else FullRefreshGoogleAnalyticsDataApi,), {})(
config=dict(**config, metrics=report.get("metrics", []), dimensions=report.get("dimensions", [])), authenticator=authenticator
)
for report in reports
]
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@
},
"date_ranges_start_date": {
"type": "string",
"title": "Date Range Start Date",
"description": "The start date. One of the values Ndaysago, yesterday, today or in the format YYYY-MM-DD",
"format": "date",
"default": "2022-12-01",
"title": "Reports replication start date",
"description": "The start date from which to begin replicating report data. Any data generated before this date will not be replicated in reports. This is a UTC date in YYYY-MM-DD format.",
"order": 2
},
"custom_reports": {
Expand Down
Loading