-
Notifications
You must be signed in to change notification settings - Fork 4.5k
🐛 Source Facebook Marketing: Convert values' types according to schema types #4978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
f926982
1d7d331
436f7ea
e235bf1
1b54589
c0dbf49
65690aa
e123327
12ea2d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
from abc import ABC | ||
from collections import deque | ||
from datetime import datetime | ||
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence | ||
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union | ||
|
||
import backoff | ||
import pendulum | ||
|
@@ -46,7 +46,7 @@ | |
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) | ||
|
||
|
||
def remove_params_from_url(url: str, params: [str]) -> str: | ||
def remove_params_from_url(url: str, params: List[str]) -> str: | ||
""" | ||
Parses a URL and removes the query parameters specified in params | ||
:param url: URL | ||
|
@@ -110,7 +110,63 @@ def read_records( | |
) -> Iterable[Mapping[str, Any]]: | ||
"""Main read method used by CDK""" | ||
for record in self._read_records(params=self.request_params(stream_state=stream_state)): | ||
yield self._extend_record(record, fields=self.fields) | ||
yield self.transform(self._extend_record(record, fields=self.fields)) | ||
|
||
def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: | ||
""" | ||
Use this method to remove update fields types in record according to schema. | ||
""" | ||
schema = self.get_json_schema() | ||
self.convert_to_schema_types(record, schema["properties"]) | ||
return record | ||
|
||
def get_python_type(self, _types: Union[list, str]) -> tuple: | ||
"""Converts types from schema to python types. Examples: | ||
- `["string", "null"]` will be converted to `(str,)` | ||
- `["array", "string", "null"]` will be converted to `(list, str,)` | ||
- `"boolean"` will be converted to `(bool,)` | ||
""" | ||
types_mapping = { | ||
"string": str, | ||
"number": float, | ||
"integer": int, | ||
"object": dict, | ||
"array": list, | ||
"boolean": bool, | ||
} | ||
|
||
if isinstance(_types, list): | ||
return tuple([types_mapping[t] for t in _types if t != "null"]) | ||
|
||
return (types_mapping[_types],) | ||
|
||
def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]): | ||
""" | ||
Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value | ||
and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string. | ||
This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all | ||
types from schema. | ||
""" | ||
if not schema: | ||
return | ||
|
||
for key, value in record.items(): | ||
if key not in schema: | ||
continue | ||
|
||
if isinstance(value, dict): | ||
self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {})) | ||
elif isinstance(value, list) and "items" in schema[key]: | ||
for record_list_item in value: | ||
if list in self.get_python_type(schema[key]["items"]["type"]): | ||
# TODO Currently we don't have support for list of lists. | ||
pass | ||
elif dict in self.get_python_type(schema[key]["items"]["type"]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the expression There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, we can't do that. |
||
self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"]) | ||
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): | ||
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) | ||
elif not isinstance(value, self.get_python_type(schema[key]["type"])): | ||
record[key] = self.get_python_type(schema[key]["type"])[0](value) | ||
|
||
def _read_records(self, params: Mapping[str, Any]) -> Iterable: | ||
"""Wrapper around query to backoff errors. | ||
|
@@ -298,7 +354,7 @@ class AdsInsights(FBMarketingIncrementalStream): | |
MAX_WAIT_TO_START = pendulum.duration(minutes=5) | ||
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30) | ||
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5) | ||
MAX_ASYNC_JOBS = 3 | ||
MAX_ASYNC_JOBS = 10 | ||
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30) | ||
|
||
action_breakdowns = ALL_ACTION_BREAKDOWNS | ||
|
@@ -325,7 +381,7 @@ def read_records( | |
# because we query `lookback_window` days before actual cursor we might get records older then cursor | ||
|
||
for obj in result.get_result(): | ||
yield obj.export_all_data() | ||
yield self.transform(obj.export_all_data()) | ||
|
||
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: | ||
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time. | ||
|
@@ -356,7 +412,7 @@ def wait_for_job(self, job) -> AdReportRun: | |
job = job.api_get() | ||
job_progress_pct = job["async_percent_completion"] | ||
job_id = job["report_run_id"] | ||
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete") | ||
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})") | ||
runtime = pendulum.now() - start_time | ||
|
||
if job["async_status"] == "Job Completed": | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we return the 0th element in this method instead of from the calling code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_python_type()
function converts types from schema to python types. This means that we must return values only fromtypes_mapping
dictionary.