Skip to content

Commit 444dbff

Browse files
tolik0jatinyadav-cc
authored andcommitted
🐛 Source Facebook Marketing: Fix error during transforming state (airbytehq#35467)
1 parent 3e87064 commit 444dbff

File tree

16 files changed

+322
-81
lines changed

16 files changed

+322
-81
lines changed

airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json

+6-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,12 @@
7575
"type": "STREAM",
7676
"stream": {
7777
"stream_state": {
78-
"updated_time": "2121-07-25T13:34:26Z",
79-
"filter_statuses": ["ARCHIVED"]
78+
"212551616838260": {
79+
"updated_time": "2121-07-25T13:34:26Z",
80+
"filter_statuses": ["ARCHIVED"],
81+
"include_deleted": true
82+
},
83+
"include_deleted": true
8084
},
8185
"stream_descriptor": {
8286
"name": "ads"

airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
13-
dockerImageTag: 1.4.0
13+
dockerImageTag: 1.4.1
1414
dockerRepository: airbyte/source-facebook-marketing
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
1616
githubIssueLabel: source-facebook-marketing
@@ -23,7 +23,6 @@ data:
2323
packageName: airbyte-source-facebook-marketing
2424
registries:
2525
cloud:
26-
dockerImageTag: 1.3.2
2726
enabled: true
2827
oss:
2928
enabled: true

airbyte-integrations/connectors/source-facebook-marketing/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "1.4.0"
6+
version = "1.4.1"
77
name = "source-facebook-marketing"
88
description = "Source implementation for Facebook Marketing."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/config_migrations.py

+41
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from airbyte_cdk.entrypoint import AirbyteEntrypoint
1111
from airbyte_cdk.sources import Source
1212
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
13+
from source_facebook_marketing.spec import ValidAdSetStatuses, ValidAdStatuses, ValidCampaignStatuses
1314

1415
logger = logging.getLogger("airbyte_logger")
1516

@@ -80,3 +81,43 @@ def migrate(cls, args: List[str], source: Source) -> None:
8081
cls.emit_control_message(
8182
cls.modify_and_save(config_path, source, config),
8283
)
84+
85+
86+
class MigrateIncludeDeletedToStatusFilters(MigrateAccountIdToArray):
87+
"""
88+
This class stands for migrating the config at runtime.
89+
This migration is backwards compatible with the previous version, as new property will be created.
90+
When falling back to the previous source version connector will use old property `include_deleted`.
91+
92+
Starting from `1.4.0`, the `include_deleted` property is replaced with `ad_statuses`,
93+
`ad_statuses` and `campaign_statuses` which represent status filters.
94+
"""
95+
96+
migrate_from_key: str = "include_deleted"
97+
migrate_to_key: str = "ad_statuses"
98+
stream_filter_to_statuses: Mapping[str, List[str]] = {
99+
"ad_statuses": [status.value for status in ValidAdStatuses],
100+
"adset_statuses": [status.value for status in ValidAdSetStatuses],
101+
"campaign_statuses": [status.value for status in ValidCampaignStatuses],
102+
}
103+
104+
@classmethod
105+
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
106+
"""
107+
This method determines whether the config should be migrated to have the new property for filters.
108+
Returns:
109+
> True, if the transformation is necessary
110+
> False, otherwise.
111+
> Raises the Exception if the structure could not be migrated.
112+
"""
113+
config_is_updated = config.get(cls.migrate_to_key)
114+
no_include_deleted = not config.get(cls.migrate_from_key)
115+
return False if config_is_updated or no_include_deleted else True
116+
117+
@classmethod
118+
def transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
119+
# transform the config
120+
for stream_filter, statuses in cls.stream_filter_to_statuses.items():
121+
config[stream_filter] = statuses
122+
# return transformed config
123+
return config

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py

-4
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,6 @@ def _transform_state_from_one_account_format(self, state: Mapping[str, Any], mov
152152
return {}
153153

154154
def _transform_state_from_old_deleted_format(self, state: Mapping[str, Any]):
155-
if all("filter_statuses" in account_state for account_state in state.values()):
156-
# state is already in the new format
157-
return state
158-
159155
# transform from the old format with `include_deleted`
160156
for account_id in self._account_ids:
161157
account_state = state.get(account_id, {})

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -164,27 +164,29 @@ def test_stream_slices_single_account_empty_state(self, incremental_class_instan
164164
[
165165
# Test case 1: State date is used because fewer filters are used
166166
(
167-
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}},
167+
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}, "include_deleted": True},
168168
{"account_id": "123", "date": "2021-01-20T00:00:00+00:00"},
169169
{
170170
"123": {
171171
"date": "2021-01-30T00:00:00+00:00",
172172
"filter_statuses": ["ACTIVE"],
173173
"include_deleted": True,
174-
}
174+
},
175+
"include_deleted": True,
175176
},
176177
["ACTIVE"],
177178
),
178179
# Test case 2: State date is used because filter_statuses is the same as include_deleted
179180
(
180-
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}},
181+
{"123": {"date": "2021-01-30T00:00:00+00:00", "include_deleted": True}, "include_deleted": True},
181182
{"account_id": "123", "date": "2021-01-20T00:00:00+00:00"},
182183
{
183184
"123": {
184185
"date": "2021-01-30T00:00:00+00:00",
185186
"filter_statuses": ["ACTIVE", "PAUSED", "DELETED"],
186187
"include_deleted": True,
187-
}
188+
},
189+
"include_deleted": True,
188190
},
189191
["ACTIVE", "PAUSED", "DELETED"],
190192
),

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_config_migrations.py

+127-66
Original file line numberDiff line numberDiff line change
@@ -6,83 +6,144 @@
66
import json
77
from typing import Any, Mapping
88

9+
import pytest
910
from airbyte_cdk.models import OrchestratorType, Type
1011
from airbyte_cdk.sources import Source
11-
from source_facebook_marketing.config_migrations import MigrateAccountIdToArray
12+
from source_facebook_marketing.config_migrations import MigrateAccountIdToArray, MigrateIncludeDeletedToStatusFilters
1213
from source_facebook_marketing.source import SourceFacebookMarketing
1314

1415
# BASE ARGS
1516
CMD = "check"
16-
TEST_CONFIG_PATH = "unit_tests/test_migrations/test_old_config.json"
17-
NEW_TEST_CONFIG_PATH = "unit_tests/test_migrations/test_new_config.json"
18-
UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/test_upgraded_config.json"
19-
SOURCE_INPUT_ARGS = [CMD, "--config", TEST_CONFIG_PATH]
2017
SOURCE: Source = SourceFacebookMarketing()
2118

2219

2320
# HELPERS
24-
def load_config(config_path: str = TEST_CONFIG_PATH) -> Mapping[str, Any]:
21+
def load_config(config_path: str) -> Mapping[str, Any]:
2522
with open(config_path, "r") as config:
2623
return json.load(config)
2724

2825

29-
def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
30-
with open(config_path, "r") as test_config:
31-
config = json.load(test_config)
32-
config.pop("account_ids")
33-
with open(config_path, "w") as updated_config:
34-
config = json.dumps(config)
35-
updated_config.write(config)
36-
37-
38-
def test_migrate_config():
39-
migration_instance = MigrateAccountIdToArray()
40-
original_config = load_config()
41-
# migrate the test_config
42-
migration_instance.migrate(SOURCE_INPUT_ARGS, SOURCE)
43-
# load the updated config
44-
test_migrated_config = load_config()
45-
# check migrated property
46-
assert "account_ids" in test_migrated_config
47-
assert isinstance(test_migrated_config["account_ids"], list)
48-
# check the old property is in place
49-
assert "account_id" in test_migrated_config
50-
assert isinstance(test_migrated_config["account_id"], str)
51-
# check the migration should be skipped, once already done
52-
assert not migration_instance.should_migrate(test_migrated_config)
53-
# load the old custom reports VS migrated
54-
assert [original_config["account_id"]] == test_migrated_config["account_ids"]
55-
# test CONTROL MESSAGE was emitted
56-
control_msg = migration_instance.message_repository._message_queue[0]
57-
assert control_msg.type == Type.CONTROL
58-
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
59-
# old custom_reports are stil type(str)
60-
assert isinstance(control_msg.control.connectorConfig.config["account_id"], str)
61-
# new custom_reports are type(list)
62-
assert isinstance(control_msg.control.connectorConfig.config["account_ids"], list)
63-
# check the migrated values
64-
assert control_msg.control.connectorConfig.config["account_ids"] == ["01234567890"]
65-
# revert the test_config to the starting point
66-
revert_migration()
67-
68-
69-
def test_config_is_reverted():
70-
# check the test_config state, it has to be the same as before tests
71-
test_config = load_config()
72-
# check the config no longer has the migarted property
73-
assert "account_ids" not in test_config
74-
# check the old property is still there
75-
assert "account_id" in test_config
76-
assert isinstance(test_config["account_id"], str)
77-
78-
79-
def test_should_not_migrate_new_config():
80-
new_config = load_config(NEW_TEST_CONFIG_PATH)
81-
migration_instance = MigrateAccountIdToArray()
82-
assert not migration_instance.should_migrate(new_config)
83-
84-
85-
def test_should_not_migrate_upgraded_config():
86-
new_config = load_config(UPGRADED_TEST_CONFIG_PATH)
87-
migration_instance = MigrateAccountIdToArray()
88-
assert not migration_instance.should_migrate(new_config)
26+
class TestMigrateAccountIdToArray:
27+
TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_old_config.json"
28+
NEW_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_new_config.json"
29+
UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_upgraded_config.json"
30+
31+
@staticmethod
32+
def revert_migration(config_path: str = TEST_CONFIG_PATH) -> None:
33+
with open(config_path, "r") as test_config:
34+
config = json.load(test_config)
35+
config.pop("account_ids")
36+
with open(config_path, "w") as updated_config:
37+
config = json.dumps(config)
38+
updated_config.write(config)
39+
40+
def test_migrate_config(self):
41+
migration_instance = MigrateAccountIdToArray()
42+
original_config = load_config(self.TEST_CONFIG_PATH)
43+
# migrate the test_config
44+
migration_instance.migrate([CMD, "--config", self.TEST_CONFIG_PATH], SOURCE)
45+
# load the updated config
46+
test_migrated_config = load_config(self.TEST_CONFIG_PATH)
47+
# check migrated property
48+
assert "account_ids" in test_migrated_config
49+
assert isinstance(test_migrated_config["account_ids"], list)
50+
# check the old property is in place
51+
assert "account_id" in test_migrated_config
52+
assert isinstance(test_migrated_config["account_id"], str)
53+
# check the migration should be skipped, once already done
54+
assert not migration_instance.should_migrate(test_migrated_config)
55+
# load the old custom reports VS migrated
56+
assert [original_config["account_id"]] == test_migrated_config["account_ids"]
57+
# test CONTROL MESSAGE was emitted
58+
control_msg = migration_instance.message_repository._message_queue[0]
59+
assert control_msg.type == Type.CONTROL
60+
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
61+
# old custom_reports are stil type(str)
62+
assert isinstance(control_msg.control.connectorConfig.config["account_id"], str)
63+
# new custom_reports are type(list)
64+
assert isinstance(control_msg.control.connectorConfig.config["account_ids"], list)
65+
# check the migrated values
66+
assert control_msg.control.connectorConfig.config["account_ids"] == ["01234567890"]
67+
# revert the test_config to the starting point
68+
self.revert_migration()
69+
70+
def test_config_is_reverted(self):
71+
# check the test_config state, it has to be the same as before tests
72+
test_config = load_config(self.TEST_CONFIG_PATH)
73+
# check the config no longer has the migarted property
74+
assert "account_ids" not in test_config
75+
# check the old property is still there
76+
assert "account_id" in test_config
77+
assert isinstance(test_config["account_id"], str)
78+
79+
def test_should_not_migrate_new_config(self):
80+
new_config = load_config(self.NEW_TEST_CONFIG_PATH)
81+
migration_instance = MigrateAccountIdToArray()
82+
assert not migration_instance.should_migrate(new_config)
83+
84+
def test_should_not_migrate_upgraded_config(self):
85+
new_config = load_config(self.UPGRADED_TEST_CONFIG_PATH)
86+
migration_instance = MigrateAccountIdToArray()
87+
assert not migration_instance.should_migrate(new_config)
88+
89+
90+
class TestMigrateIncludeDeletedToStatusFilters:
91+
OLD_TEST1_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_false/test_old_config.json"
92+
NEW_TEST1_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_false/test_new_config.json"
93+
OLD_TEST2_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_true/test_old_config.json"
94+
NEW_TEST2_CONFIG_PATH = "unit_tests/test_migrations/include_deleted_to_status_filters/include_deleted_true/test_new_config.json"
95+
96+
UPGRADED_TEST_CONFIG_PATH = "unit_tests/test_migrations/account_id_to_array/test_upgraded_config.json"
97+
98+
filter_properties = ["ad_statuses", "adset_statuses", "campaign_statuses"]
99+
100+
def revert_migration(self, config_path: str) -> None:
101+
with open(config_path, "r") as test_config:
102+
config = json.load(test_config)
103+
for filter in self.filter_properties:
104+
config.pop(filter)
105+
with open(config_path, "w") as updated_config:
106+
config = json.dumps(config)
107+
updated_config.write(config)
108+
109+
@pytest.mark.parametrize(
110+
"old_config_path, new_config_path, include_deleted",
111+
[(OLD_TEST1_CONFIG_PATH, NEW_TEST1_CONFIG_PATH, False), (OLD_TEST2_CONFIG_PATH, NEW_TEST2_CONFIG_PATH, True)],
112+
)
113+
def test_migrate_config(self, old_config_path, new_config_path, include_deleted):
114+
migration_instance = MigrateIncludeDeletedToStatusFilters()
115+
original_config = load_config(old_config_path)
116+
# migrate the test_config
117+
migration_instance.migrate([CMD, "--config", old_config_path], SOURCE)
118+
# load the updated config
119+
test_migrated_config = load_config(old_config_path)
120+
# load expected updated config
121+
expected_new_config = load_config(new_config_path)
122+
# compare expected with migrated
123+
assert expected_new_config == test_migrated_config
124+
# check migrated property
125+
if include_deleted:
126+
assert all([filter in test_migrated_config for filter in self.filter_properties])
127+
# check the old property is in place
128+
assert "include_deleted" in test_migrated_config
129+
assert test_migrated_config["include_deleted"] == include_deleted
130+
# check the migration should be skipped, once already done
131+
assert not migration_instance.should_migrate(test_migrated_config)
132+
if include_deleted:
133+
# test CONTROL MESSAGE was emitted
134+
control_msg = migration_instance.message_repository._message_queue[0]
135+
assert control_msg.type == Type.CONTROL
136+
assert control_msg.control.type == OrchestratorType.CONNECTOR_CONFIG
137+
# revert the test_config to the starting point
138+
self.revert_migration(old_config_path)
139+
140+
@pytest.mark.parametrize("new_config_path", [NEW_TEST1_CONFIG_PATH, NEW_TEST2_CONFIG_PATH])
141+
def test_should_not_migrate_new_config(self, new_config_path):
142+
new_config = load_config(new_config_path)
143+
migration_instance = MigrateIncludeDeletedToStatusFilters()
144+
assert not migration_instance.should_migrate(new_config)
145+
146+
def test_should_not_migrate_upgraded_config(self):
147+
new_config = load_config(self.UPGRADED_TEST_CONFIG_PATH)
148+
migration_instance = MigrateIncludeDeletedToStatusFilters()
149+
assert not migration_instance.should_migrate(new_config)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"start_date": "2021-02-08T00:00:00Z",
3+
"end_date": "2021-02-15T00:00:00Z",
4+
"custom_insights": [
5+
{
6+
"name": "custom_insight_stream",
7+
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
8+
"breakdowns": ["gender"],
9+
"action_breakdowns": []
10+
}
11+
],
12+
"account_ids": ["01234567890"],
13+
"access_token": "access_token",
14+
"include_deleted": false
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"start_date": "2021-02-08T00:00:00Z",
3+
"end_date": "2021-02-15T00:00:00Z",
4+
"custom_insights": [
5+
{
6+
"name": "custom_insight_stream",
7+
"fields": ["account_name", "clicks", "cpc", "account_id", "ad_id"],
8+
"breakdowns": ["gender"],
9+
"action_breakdowns": []
10+
}
11+
],
12+
"include_deleted": false,
13+
"account_ids": ["01234567890"],
14+
"access_token": "access_token"
15+
}

0 commit comments

Comments
 (0)