Skip to content

Commit f926982

Browse files
committed
Convert values' types according to schema types
1 parent 589d535 commit f926982

File tree

8 files changed

+86
-79
lines changed

8 files changed

+86
-79
lines changed

airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml

+6-5
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@ tests:
88
- config_path: "secrets/config.json"
99
status: "succeed"
1010
- config_path: "integration_tests/invalid_config.json"
11-
status: "exception"
11+
status: "failed"
1212
discovery:
1313
- config_path: "secrets/config.json"
1414
basic_read:
1515
- config_path: "secrets/config.json"
1616
configured_catalog_path: "integration_tests/configured_catalog.json"
17-
# FB serializes numeric fields as strings
18-
validate_schema: no
17+
timeout_seconds: 600
1918
incremental:
2019
- config_path: "secrets/config.json"
2120
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
22-
future_state_path: "integration_tests/abnormal_state.json"
21+
future_state_path: "integration_tests/future_state.json"
2322
full_refresh:
2423
- config_path: "secrets/config.json"
25-
configured_catalog_path: "integration_tests/configured_catalog.json"
24+
# TODO Change below `configured_catalog_without_insights.json` to `configured_catalog.json` after October 7 2021
25+
# because all running campaigns should be finished by that time.
26+
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"

airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json

-30
Original file line numberDiff line numberDiff line change
@@ -59,36 +59,6 @@
5959
"cursor_field": null,
6060
"destination_sync_mode": "append",
6161
"primary_key": null
62-
},
63-
{
64-
"stream": {
65-
"name": "ads_insights",
66-
"json_schema": {},
67-
"supported_sync_modes": ["full_refresh", "incremental"],
68-
"source_defined_cursor": true,
69-
"default_cursor_field": ["date_start"],
70-
"source_defined_primary_key": null,
71-
"namespace": null
72-
},
73-
"sync_mode": "incremental",
74-
"cursor_field": ["date_start"],
75-
"destination_sync_mode": "append",
76-
"primary_key": null
77-
},
78-
{
79-
"stream": {
80-
"name": "ads_insights_age_and_gender",
81-
"json_schema": {},
82-
"supported_sync_modes": ["full_refresh", "incremental"],
83-
"source_defined_cursor": true,
84-
"default_cursor_field": ["date_start"],
85-
"source_defined_primary_key": null,
86-
"namespace": null
87-
},
88-
"sync_mode": "incremental",
89-
"cursor_field": ["date_start"],
90-
"destination_sync_mode": "append",
91-
"primary_key": null
9262
}
9363
]
9464
}
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
11
{
22
"campaigns": {
3-
"updated_time": "2021-07-25T13:34:26Z",
3+
"updated_time": "2121-07-25T13:34:26Z",
44
"include_deleted": true
55
},
66
"ad_creatives": {
7-
"updated_time": "2021-07-25T13:34:26Z",
7+
"updated_time": "2121-07-25T13:34:26Z",
88
"include_deleted": true
99
},
1010
"ad_sets": {
11-
"updated_time": "2021-07-25T13:34:26Z",
11+
"updated_time": "2121-07-25T13:34:26Z",
1212
"include_deleted": true
1313
},
1414
"ads": {
15-
"updated_time": "2021-07-25T13:34:26Z",
15+
"updated_time": "2121-07-25T13:34:26Z",
1616
"include_deleted": true
1717
},
1818
"ads_insights": {
19-
"date_start": "2021-07-25T13:34:26Z",
19+
"date_start": "2121-07-25T13:34:26Z",
2020
"include_deleted": true
2121
},
2222
"ads_insights_age_and_gender": {
23-
"date_start": "2021-07-25T13:34:26Z",
23+
"date_start": "2121-07-25T13:34:26Z",
2424
"include_deleted": true
2525
},
2626
"ads_insights_country": {
27-
"date_start": "2021-07-25T13:34:26Z",
27+
"date_start": "2121-07-25T13:34:26Z",
2828
"include_deleted": true
2929
},
3030
"ads_insights_dma": {
31-
"date_start": "2021-07-25T13:34:26Z",
31+
"date_start": "2121-07-25T13:34:26Z",
3232
"include_deleted": true
3333
},
3434
"ads_insights_platfrom_and_device": {
35-
"date_start": "2021-07-25T13:34:26Z",
35+
"date_start": "2121-07-25T13:34:26Z",
3636
"include_deleted": true
3737
},
3838
"ads_insights_region": {
39-
"date_start": "2021-07-25T13:34:26Z",
39+
"date_start": "2121-07-25T13:34:26Z",
4040
"include_deleted": true
4141
}
4242
}

airbyte-integrations/connectors/source-facebook-marketing/integration_tests/invalid_config.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"start_date": "2021-04-01T00:00:00Z",
33
"account_id": "account",
44
"access_token": "wrong_token",
5-
"include_deleted": "true"
5+
"include_deleted": true
66
}

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ad_sets.json

+3-18
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,10 @@
4848
"format": "date-time"
4949
},
5050
"daily_budget": {
51-
"type": ["null", "number"],
52-
"maximum": 100000000000000000000000000000000,
53-
"minimum": -100000000000000000000000000000000,
54-
"multipleOf": 0.000001,
55-
"exclusiveMaximum": true,
56-
"exclusiveMinimum": true
51+
"type": ["null", "number"]
5752
},
5853
"budget_remaining": {
59-
"type": ["null", "number"],
60-
"maximum": 100000000000000000000000000000000,
61-
"minimum": -100000000000000000000000000000000,
62-
"multipleOf": 0.000001,
63-
"exclusiveMaximum": true,
64-
"exclusiveMinimum": true
54+
"type": ["null", "number"]
6555
},
6656
"effective_status": {
6757
"type": ["null", "string"]
@@ -78,12 +68,7 @@
7868
"format": "date-time"
7969
},
8070
"lifetime_budget": {
81-
"type": ["null", "number"],
82-
"maximum": 100000000000000000000000000000000,
83-
"minimum": -100000000000000000000000000000000,
84-
"multipleOf": 0.000001,
85-
"exclusiveMaximum": true,
86-
"exclusiveMinimum": true
71+
"type": ["null", "number"]
8772
},
8873
"targeting": { "$ref": "targeting.json" },
8974
"bid_info": {

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,18 @@
148148
"type": ["null", "number"]
149149
},
150150
"created_time": {
151-
"format": "date-time",
151+
"format": "date",
152152
"type": ["null", "string"]
153153
},
154154
"ctr": {
155155
"type": ["null", "number"]
156156
},
157157
"date_start": {
158-
"format": "date-time",
158+
"format": "date",
159159
"type": ["null", "string"]
160160
},
161161
"date_stop": {
162-
"format": "date-time",
162+
"format": "date",
163163
"type": ["null", "string"]
164164
},
165165
"engagement_rate_ranking": {
@@ -280,7 +280,7 @@
280280
"$ref": "ads_action_stats.json"
281281
},
282282
"updated_time": {
283-
"format": "date-time",
283+
"format": "date",
284284
"type": ["null", "string"]
285285
},
286286
"video_15_sec_watched_actions": {

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py

+55-5
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from abc import ABC
2828
from collections import deque
2929
from datetime import datetime
30-
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
30+
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union
3131

3232
import backoff
3333
import pendulum
@@ -107,7 +107,57 @@ def read_records(
107107
) -> Iterable[Mapping[str, Any]]:
108108
"""Main read method used by CDK"""
109109
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
110-
yield self._extend_record(record, fields=self.fields)
110+
yield self.transform(self._extend_record(record, fields=self.fields))
111+
112+
def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
113+
"""
114+
Use this method to remove update fields types in record according to schema.
115+
"""
116+
schema = self.get_json_schema()
117+
self.convert_to_schema_types(record, schema["properties"])
118+
return record
119+
120+
def get_python_type(self, _types: Union[list, str]) -> tuple:
121+
types_mapping = {
122+
"string": str,
123+
"number": float,
124+
"integer": int,
125+
"null": None,
126+
"object": dict,
127+
"array": list,
128+
"boolean": bool,
129+
}
130+
131+
if isinstance(_types, list):
132+
return tuple([types_mapping[t] for t in _types if t != "null"])
133+
134+
return (types_mapping[_types],)
135+
136+
def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]):
137+
"""
138+
Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value
139+
and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string.
140+
This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all
141+
types from schema.
142+
"""
143+
for key, value in record.items():
144+
if key not in schema:
145+
continue
146+
147+
if isinstance(value, dict):
148+
self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {}))
149+
elif isinstance(value, list) and "items" in schema[key]:
150+
for record_list_item in value:
151+
if list in self.get_python_type(schema[key]["items"]["type"]):
152+
# TODO Currently we don't have support for list of lists.
153+
pass
154+
elif dict in self.get_python_type(schema[key]["items"]["type"]):
155+
self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"])
156+
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
157+
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)
158+
159+
if not isinstance(value, self.get_python_type(schema[key]["type"])):
160+
record[key] = self.get_python_type(schema[key]["type"])[0](value)
111161

112162
def _read_records(self, params: Mapping[str, Any]) -> Iterable:
113163
"""Wrapper around query to backoff errors.
@@ -295,7 +345,7 @@ class AdsInsights(FBMarketingIncrementalStream):
295345
MAX_WAIT_TO_START = pendulum.duration(minutes=5)
296346
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
297347
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
298-
MAX_ASYNC_JOBS = 3
348+
MAX_ASYNC_JOBS = 10
299349
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)
300350

301351
action_breakdowns = ALL_ACTION_BREAKDOWNS
@@ -322,7 +372,7 @@ def read_records(
322372
# because we query `lookback_window` days before actual cursor we might get records older then cursor
323373

324374
for obj in result.get_result():
325-
yield obj.export_all_data()
375+
yield self.transform(obj.export_all_data())
326376

327377
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
328378
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
@@ -353,7 +403,7 @@ def wait_for_job(self, job) -> AdReportRun:
353403
job = job.api_get()
354404
job_progress_pct = job["async_percent_completion"]
355405
job_id = job["report_run_id"]
356-
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete")
406+
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})")
357407
runtime = pendulum.now() - start_time
358408

359409
if job["async_status"] == "Job Completed":

docs/integrations/sources/facebook-marketing.md

+7-6
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ See Facebook's [documentation on rate limiting](https://developers.facebook.com/
5656

5757
### Requirements
5858

59-
* A Facebook Ad Account ID
59+
* A Facebook Ad Account ID
6060
* A Facebook App which has the Marketing API enabled
6161
* A Facebook Marketing API Access Token
6262
* Request a rate limit increase from Facebook
@@ -101,15 +101,16 @@ With the Ad Account ID and API access token, you should be ready to start pullin
101101

102102
| Version | Date | Pull Request | Subject |
103103
| :------ | :-------- | :----- | :------ |
104-
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management|
105-
| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:<br>- Improve error handling.<br>- Improve async job performance (insights).<br>- Add new configuration parameter `insights_days_per_job`.<br>- Rename stream `adsets` to `ad_sets`.<br>- Refactor schema logic for insights, allowing to configure any possible insight stream.|
106-
| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0|
104+
| 0.2.15 | 2021-09-08 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types |
105+
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management |
106+
| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:<br>- Improve error handling.<br>- Improve async job performance (insights).<br>- Add new configuration parameter `insights_days_per_job`.<br>- Rename stream `adsets` to `ad_sets`.<br>- Refactor schema logic for insights, allowing to configure any possible insight stream. |
107+
| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0 |
107108
| 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
108-
| 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code|
109+
| 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code |
109110
| 0.2.7 | 2021-06-03 | [3646](https://github.com/airbytehq/airbyte/pull/3646) | Add missing fields to AdInsights streams |
110111
| 0.2.6 | 2021-05-25 | [3525](https://github.com/airbytehq/airbyte/pull/3525) | Fix handling call rate limit |
111112
| 0.2.5 | 2021-05-20 | [3396](https://github.com/airbytehq/airbyte/pull/3396) | Allow configuring insights lookback window |
112-
| 0.2.4 | 2021-05-13 | [3395](https://github.com/airbytehq/airbyte/pull/3395) | Fix an issue that caused losing Insights data from the past 28 days while incremental sync|
113+
| 0.2.4 | 2021-05-13 | [3395](https://github.com/airbytehq/airbyte/pull/3395) | Fix an issue that caused losing Insights data from the past 28 days while incremental sync |
113114
| 0.2.3 | 2021-04-28 | [3116](https://github.com/airbytehq/airbyte/pull/3116) | Wait longer (5 min) for async jobs to start |
114115
| 0.2.2 | 2021-04-03 | [2726](https://github.com/airbytehq/airbyte/pull/2726) | Fix base connector versioning |
115116
| 0.2.1 | 2021-03-12 | [2391](https://github.com/airbytehq/airbyte/pull/2391) | Support FB Marketing API v10 |

0 commit comments

Comments
 (0)