From f9269826e97894d5f457c513569f3a4ba969ac17 Mon Sep 17 00:00:00 2001 From: oleh Date: Tue, 7 Sep 2021 23:56:12 +0300 Subject: [PATCH 1/7] Convert values' types according to schema types --- .../acceptance-test-config.yml | 11 ++-- .../integration_tests/configured_catalog.json | 30 ---------- ...{abnormal_state.json => future_state.json} | 20 +++---- .../integration_tests/invalid_config.json | 2 +- .../schemas/ad_sets.json | 21 +------ .../schemas/ads_insights.json | 8 +-- .../source_facebook_marketing/streams.py | 60 +++++++++++++++++-- .../sources/facebook-marketing.md | 13 ++-- 8 files changed, 86 insertions(+), 79 deletions(-) rename airbyte-integrations/connectors/source-facebook-marketing/integration_tests/{abnormal_state.json => future_state.json} (57%) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml index 2e84915bed60d..ac45f955c0cba 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml @@ -8,18 +8,19 @@ tests: - config_path: "secrets/config.json" status: "succeed" - config_path: "integration_tests/invalid_config.json" - status: "exception" + status: "failed" discovery: - config_path: "secrets/config.json" basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" - # FB serializes numeric fields as strings - validate_schema: no + timeout_seconds: 600 incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog_without_insights.json" - future_state_path: "integration_tests/abnormal_state.json" + future_state_path: "integration_tests/future_state.json" full_refresh: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" + # TODO Change below `configured_catalog_without_insights.json` to `configured_catalog.json` after October 7 2021 + # because all running campaigns should be finished by that time. + configured_catalog_path: "integration_tests/configured_catalog_without_insights.json" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json index 67dc36d5d8071..fee4405518324 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json @@ -59,36 +59,6 @@ "cursor_field": null, "destination_sync_mode": "append", "primary_key": null - }, - { - "stream": { - "name": "ads_insights", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["date_start"], - "source_defined_primary_key": null, - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": ["date_start"], - "destination_sync_mode": "append", - "primary_key": null - }, - { - "stream": { - "name": "ads_insights_age_and_gender", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["date_start"], - "source_defined_primary_key": null, - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": ["date_start"], - "destination_sync_mode": "append", - "primary_key": null } ] } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json similarity index 57% rename from airbyte-integrations/connectors/source-facebook-marketing/integration_tests/abnormal_state.json rename to airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json index 437161ace3dce..f516de3682e07 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json @@ -1,42 +1,42 @@ { "campaigns": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ad_creatives": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ad_sets": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_age_and_gender": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_country": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_dma": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_platfrom_and_device": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_region": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true } } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/invalid_config.json index f655e2b17a218..f7b8210b9e5db 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/invalid_config.json @@ -2,5 +2,5 @@ "start_date": "2021-04-01T00:00:00Z", "account_id": "account", "access_token": "wrong_token", - "include_deleted": "true" + "include_deleted": true } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ad_sets.json b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ad_sets.json index b5ad2fcae6f71..bb203911eb613 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ad_sets.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ad_sets.json @@ -48,20 +48,10 @@ "format": "date-time" }, "daily_budget": { - "type": ["null", "number"], - "maximum": 100000000000000000000000000000000, - "minimum": -100000000000000000000000000000000, - "multipleOf": 0.000001, - "exclusiveMaximum": true, - "exclusiveMinimum": true + "type": ["null", "number"] }, "budget_remaining": { - "type": ["null", "number"], - "maximum": 100000000000000000000000000000000, - "minimum": -100000000000000000000000000000000, - "multipleOf": 0.000001, - "exclusiveMaximum": true, - "exclusiveMinimum": true + "type": ["null", "number"] }, "effective_status": { "type": ["null", "string"] @@ -78,12 +68,7 @@ "format": "date-time" }, "lifetime_budget": { - "type": ["null", "number"], - "maximum": 100000000000000000000000000000000, - "minimum": -100000000000000000000000000000000, - "multipleOf": 0.000001, - "exclusiveMaximum": true, - "exclusiveMinimum": true + "type": ["null", "number"] }, "targeting": { "$ref": "targeting.json" }, "bid_info": { diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json index 3361f870bc9bd..702de57b79696 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json @@ -148,18 +148,18 @@ "type": ["null", "number"] }, "created_time": { - "format": "date-time", + "format": "date", "type": ["null", "string"] }, "ctr": { "type": ["null", "number"] }, "date_start": { - "format": "date-time", + "format": "date", "type": ["null", "string"] }, "date_stop": { - "format": "date-time", + "format": "date", "type": ["null", "string"] }, "engagement_rate_ranking": { @@ -280,7 +280,7 @@ "$ref": "ads_action_stats.json" }, "updated_time": { - "format": "date-time", + "format": "date", "type": ["null", "string"] }, "video_15_sec_watched_actions": { diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index 1c375bb4f7bb5..d6a8292cff0b3 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -27,7 +27,7 @@ from abc import ABC from collections import deque from datetime import datetime -from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence +from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union import backoff import pendulum @@ -107,7 +107,57 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: """Main read method used by CDK""" for record in self._read_records(params=self.request_params(stream_state=stream_state)): - yield self._extend_record(record, fields=self.fields) + yield self.transform(self._extend_record(record, fields=self.fields)) + + def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Use this method to remove update fields types in record according to schema. + """ + schema = self.get_json_schema() + self.convert_to_schema_types(record, schema["properties"]) + return record + + def get_python_type(self, _types: Union[list, str]) -> tuple: + types_mapping = { + "string": str, + "number": float, + "integer": int, + "null": None, + "object": dict, + "array": list, + "boolean": bool, + } + + if isinstance(_types, list): + return tuple([types_mapping[t] for t in _types if t != "null"]) + + return (types_mapping[_types],) + + def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]): + """ + Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value + and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string. + This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all + types from schema. + """ + for key, value in record.items(): + if key not in schema: + continue + + if isinstance(value, dict): + self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {})) + elif isinstance(value, list) and "items" in schema[key]: + for record_list_item in value: + if list in self.get_python_type(schema[key]["items"]["type"]): + # TODO Currently we don't have support for list of lists. + pass + elif dict in self.get_python_type(schema[key]["items"]["type"]): + self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"]) + elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): + record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) + + if not isinstance(value, self.get_python_type(schema[key]["type"])): + record[key] = self.get_python_type(schema[key]["type"])[0](value) def _read_records(self, params: Mapping[str, Any]) -> Iterable: """Wrapper around query to backoff errors. @@ -295,7 +345,7 @@ class AdsInsights(FBMarketingIncrementalStream): MAX_WAIT_TO_START = pendulum.duration(minutes=5) MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30) MAX_ASYNC_SLEEP = pendulum.duration(minutes=5) - MAX_ASYNC_JOBS = 3 + MAX_ASYNC_JOBS = 10 INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30) action_breakdowns = ALL_ACTION_BREAKDOWNS @@ -322,7 +372,7 @@ def read_records( # because we query `lookback_window` days before actual cursor we might get records older then cursor for obj in result.get_result(): - yield obj.export_all_data() + yield self.transform(obj.export_all_data()) def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time. @@ -353,7 +403,7 @@ def wait_for_job(self, job) -> AdReportRun: job = job.api_get() job_progress_pct = job["async_percent_completion"] job_id = job["report_run_id"] - self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete") + self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})") runtime = pendulum.now() - start_time if job["async_status"] == "Job Completed": diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index b7acd90d17d13..49e18ebb0d424 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -56,7 +56,7 @@ See Facebook's [documentation on rate limiting](https://developers.facebook.com/ ### Requirements -* A Facebook Ad Account ID +* A Facebook Ad Account ID * A Facebook App which has the Marketing API enabled * A Facebook Marketing API Access Token * Request a rate limit increase from Facebook @@ -101,15 +101,16 @@ With the Ad Account ID and API access token, you should be ready to start pullin | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management| -| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:
- Improve error handling.
- Improve async job performance (insights).
- Add new configuration parameter `insights_days_per_job`.
- Rename stream `adsets` to `ad_sets`.
- Refactor schema logic for insights, allowing to configure any possible insight stream.| -| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0| +| 0.2.15 | 2021-09-08 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types | +| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management | +| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:
- Improve error handling.
- Improve async job performance (insights).
- Add new configuration parameter `insights_days_per_job`.
- Rename stream `adsets` to `ad_sets`.
- Refactor schema logic for insights, allowing to configure any possible insight stream. | +| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0 | | 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support | -| 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code| +| 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code | | 0.2.7 | 2021-06-03 | [3646](https://github.com/airbytehq/airbyte/pull/3646) | Add missing fields to AdInsights streams | | 0.2.6 | 2021-05-25 | [3525](https://github.com/airbytehq/airbyte/pull/3525) | Fix handling call rate limit | | 0.2.5 | 2021-05-20 | [3396](https://github.com/airbytehq/airbyte/pull/3396) | Allow configuring insights lookback window | -| 0.2.4 | 2021-05-13 | [3395](https://github.com/airbytehq/airbyte/pull/3395) | Fix an issue that caused losing Insights data from the past 28 days while incremental sync| +| 0.2.4 | 2021-05-13 | [3395](https://github.com/airbytehq/airbyte/pull/3395) | Fix an issue that caused losing Insights data from the past 28 days while incremental sync | | 0.2.3 | 2021-04-28 | [3116](https://github.com/airbytehq/airbyte/pull/3116) | Wait longer (5 min) for async jobs to start | | 0.2.2 | 2021-04-03 | [2726](https://github.com/airbytehq/airbyte/pull/2726) | Fix base connector versioning | | 0.2.1 | 2021-03-12 | [2391](https://github.com/airbytehq/airbyte/pull/2391) | Support FB Marketing API v10 | From 1d7d3311aeb37f07a7c4ec7d3eb7c5d89eece256 Mon Sep 17 00:00:00 2001 From: oleh Date: Wed, 8 Sep 2021 00:41:18 +0300 Subject: [PATCH 2/7] Put streams back to `configured_catalog.json` Put back `ads_insights` and `ads_insights_age_and_gender` streams. --- .../integration_tests/configured_catalog.json | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json index fee4405518324..67dc36d5d8071 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json @@ -59,6 +59,36 @@ "cursor_field": null, "destination_sync_mode": "append", "primary_key": null + }, + { + "stream": { + "name": "ads_insights", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date_start"], + "source_defined_primary_key": null, + "namespace": null + }, + "sync_mode": "incremental", + "cursor_field": ["date_start"], + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "ads_insights_age_and_gender", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date_start"], + "source_defined_primary_key": null, + "namespace": null + }, + "sync_mode": "incremental", + "cursor_field": ["date_start"], + "destination_sync_mode": "append", + "primary_key": null } ] } From 436f7ead031ffbc83102605de006c90fb120ff7b Mon Sep 17 00:00:00 2001 From: oleh Date: Fri, 10 Sep 2021 17:17:19 +0300 Subject: [PATCH 3/7] Pickup changes from #5946 --- .../source_facebook_marketing/schemas/ads_insights.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json index 702de57b79696..1428e29633076 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights.json @@ -214,7 +214,7 @@ "type": ["null", "number"] }, "instant_experience_outbound_clicks": { - "type": ["null", "integer"] + "$ref": "ads_action_stats.json" }, "labels": { "type": ["null", "string"] @@ -311,7 +311,7 @@ "$ref": "ads_action_stats.json" }, "video_play_actions": { - "$ref": "ads_histogram_stats.json" + "$ref": "ads_action_stats.json" }, "video_play_curve_actions": { "$ref": "ads_histogram_stats.json" From 1b545893811ed0ddcbff647a201eceb095092c59 Mon Sep 17 00:00:00 2001 From: oleh Date: Tue, 14 Sep 2021 11:05:36 +0300 Subject: [PATCH 4/7] Implement change request + fix previous PR --- .../source_facebook_marketing/streams.py | 23 ++++++++++++------- .../unit_tests/test_streams.py | 1 + 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index e7986d1136c8f..958c433c00c8b 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -46,7 +46,7 @@ backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) -def remove_params_from_url(url: str, params: [str]) -> str: +def remove_params_from_url(url: str, params: List[str]) -> str: """ Parses a URL and removes the query parameters specified in params :param url: URL @@ -121,11 +121,15 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: return record def get_python_type(self, _types: Union[list, str]) -> tuple: + """Converts types from schema to python types. Examples: + - `["string", "null"]` will be converted to `(str,)` + - `["array", "string", "null"]` will be converted to `(list, str,)` + - `"boolean"` will be converted to `(bool,)` + """ types_mapping = { "string": str, "number": float, "integer": int, - "null": None, "object": dict, "array": list, "boolean": bool, @@ -143,23 +147,26 @@ def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all types from schema. """ + if not schema: + return + for key, value in record.items(): if key not in schema: continue + items_type = schema[key]["items"]["type"] if isinstance(value, dict): self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {})) elif isinstance(value, list) and "items" in schema[key]: for record_list_item in value: - if list in self.get_python_type(schema[key]["items"]["type"]): + if list in self.get_python_type(items_type): # TODO Currently we don't have support for list of lists. pass - elif dict in self.get_python_type(schema[key]["items"]["type"]): + elif dict in self.get_python_type(items_type): self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"]) - elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): - record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) - - if not isinstance(value, self.get_python_type(schema[key]["type"])): + elif not isinstance(record_list_item, self.get_python_type(items_type)): + record[key] = self.get_python_type(items_type)[0](record_list_item) + elif not isinstance(value, self.get_python_type(schema[key]["type"])): record[key] = self.get_python_type(schema[key]["type"])[0](value) def _read_records(self, params: Mapping[str, Any]) -> Iterable: diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py index b41e09faeeb78..446c12cf316a1 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py @@ -21,6 +21,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # + from source_facebook_marketing.streams import remove_params_from_url From c0dbf49cb23ebf40f5a31a62b0db7fa9a6ea7ceb Mon Sep 17 00:00:00 2001 From: oleh Date: Tue, 14 Sep 2021 11:08:44 +0300 Subject: [PATCH 5/7] Update schema --- .../source_facebook_marketing/schemas/shared/targeting.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/shared/targeting.json b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/shared/targeting.json index 62137a1909196..b17fd1c819fc8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/shared/targeting.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/shared/targeting.json @@ -21,7 +21,7 @@ "$ref": "targeting.json#/definitions/id_name_pairs" }, "home_type": { - "$ref$": "targeting.json#/definitions/id_name_pairs" + "$ref": "targeting.json#/definitions/id_name_pairs" }, "friends_of_connections": { "$ref": "targeting.json#/definitions/id_name_pairs" From 65690aa2ca071fa5b1742159bf575e438ebf27f4 Mon Sep 17 00:00:00 2001 From: oleh Date: Tue, 14 Sep 2021 12:27:52 +0300 Subject: [PATCH 6/7] Remove items_type from convert_to_schema_types() --- .../source_facebook_marketing/streams.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index 958c433c00c8b..794aaa6ddbec7 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -154,18 +154,17 @@ def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str if key not in schema: continue - items_type = schema[key]["items"]["type"] if isinstance(value, dict): self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {})) elif isinstance(value, list) and "items" in schema[key]: for record_list_item in value: - if list in self.get_python_type(items_type): + if list in self.get_python_type(schema[key]["items"]["type"]): # TODO Currently we don't have support for list of lists. pass - elif dict in self.get_python_type(items_type): + elif dict in self.get_python_type(schema[key]["items"]["type"]): self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"]) - elif not isinstance(record_list_item, self.get_python_type(items_type)): - record[key] = self.get_python_type(items_type)[0](record_list_item) + elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): + record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) elif not isinstance(value, self.get_python_type(schema[key]["type"])): record[key] = self.get_python_type(schema[key]["type"])[0](value) From 12ea2d2e8f82650344523ae24b071d7313032a0a Mon Sep 17 00:00:00 2001 From: oleh Date: Tue, 14 Sep 2021 19:34:56 +0300 Subject: [PATCH 7/7] Bump connectors version --- .../e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../connectors/source-facebook-marketing/Dockerfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json index 9575a7f02551a..3e649449fa893 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c", "name": "Facebook Marketing", "dockerRepository": "airbyte/source-facebook-marketing", - "dockerImageTag": "0.2.16", + "dockerImageTag": "0.2.17", "documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing", "icon": "facebook.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 59cb8126b03a4..a59ac24c1df02 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -137,7 +137,7 @@ - sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c name: Facebook Marketing dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.16 + dockerImageTag: 0.2.17 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg - sourceDefinitionId: 010eb12f-837b-4685-892d-0a39f76a98f5 diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 39c7b47cef269..2edff54bae844 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.16 +LABEL io.airbyte.version=0.2.17 LABEL io.airbyte.name=airbyte/source-facebook-marketing