2
2
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3
3
#
4
4
5
+ import logging
5
6
from datetime import datetime
7
+ from typing import Any , List , Mapping , Optional , Tuple , Type , MutableMapping
8
+ from jsonschema import RefResolver
9
+
10
+
11
+ from airbyte_cdk .logger import AirbyteLogger
12
+ from airbyte_cdk .models import AirbyteConnectionStatus , AuthSpecification , ConnectorSpecification , DestinationSyncMode , OAuth2Specification , Status
13
+
6
14
from typing import Any , List , Mapping , Optional , Tuple , Type
7
15
8
16
import pendulum
9
- from airbyte_cdk .models import AuthSpecification , ConnectorSpecification , DestinationSyncMode , OAuth2Specification
10
17
from airbyte_cdk .sources import AbstractSource
11
18
from airbyte_cdk .sources .streams import Stream
19
+ from airbyte_cdk .sources .streams .core import package_name_from_class
20
+ from airbyte_cdk .sources .utils .schema_helpers import ResourceSchemaLoader
12
21
from pydantic import BaseModel , Field
13
22
from source_facebook_marketing .api import API
14
23
from source_facebook_marketing .streams import (
26
35
)
27
36
28
37
38
+ logger = logging .getLogger (__name__ )
39
+
40
+ class InsightConfig (BaseModel ):
41
+
42
+ name : str = Field (description = "The name value of insight" )
43
+
44
+ fields : Optional [List [str ]] = Field (description = "A list of chosen fields for fields parameter" , default = [])
45
+
46
+ breakdowns : Optional [List [str ]] = Field (description = "A list of chosen breakdowns for breakdowns" , default = [])
47
+
48
+ action_breakdowns : Optional [List [str ]] = Field (description = "A list of chosen action_breakdowns for action_breakdowns" , default = [])
49
+
50
+
29
51
class ConnectorConfig (BaseModel ):
30
52
class Config :
31
53
title = "Source Facebook Marketing"
@@ -65,6 +87,9 @@ class Config:
65
87
minimum = 1 ,
66
88
maximum = 30 ,
67
89
)
90
+ custom_insights : Optional [List [InsightConfig ]] = Field (
91
+ description = "A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)"
92
+ )
68
93
69
94
70
95
class SourceFacebookMarketing (AbstractSource ):
@@ -104,10 +129,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
104
129
days_per_job = config .insights_days_per_job ,
105
130
)
106
131
107
- return [
132
+ streams = [
108
133
Campaigns (api = api , start_date = config .start_date , end_date = config .end_date , include_deleted = config .include_deleted ),
109
134
AdSets (api = api , start_date = config .start_date , end_date = config .end_date , include_deleted = config .include_deleted ),
110
135
Ads (api = api , start_date = config .start_date , end_date = config .end_date , include_deleted = config .include_deleted ),
136
+
111
137
AdCreatives (api = api ),
112
138
AdsInsights (** insights_args ),
113
139
AdsInsightsAgeAndGender (** insights_args ),
@@ -118,6 +144,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
118
144
AdsInsightsActionType (** insights_args ),
119
145
]
120
146
147
+ return self ._update_insights_streams (insights = config .custom_insights , args = insights_args , streams = streams )
148
+
149
+ def check (self , logger : AirbyteLogger , config : Mapping [str , Any ]) -> AirbyteConnectionStatus :
150
+ """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
151
+ try :
152
+ check_succeeded , error = self .check_connection (logger , config )
153
+ if not check_succeeded :
154
+ return AirbyteConnectionStatus (status = Status .FAILED , message = repr (error ))
155
+ except Exception as e :
156
+ return AirbyteConnectionStatus (status = Status .FAILED , message = repr (e ))
157
+
158
+ self ._check_custom_insights_entries (config .get ('custom_insights' , []))
159
+
160
+ return AirbyteConnectionStatus (status = Status .SUCCEEDED )
161
+
162
+
121
163
def spec (self , * args , ** kwargs ) -> ConnectorSpecification :
122
164
"""
123
165
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
@@ -128,11 +170,80 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification:
128
170
changelogUrl = "https://docs.airbyte.io/integrations/sources/facebook-marketing" ,
129
171
supportsIncremental = True ,
130
172
supported_destination_sync_modes = [DestinationSyncMode .append ],
131
- connectionSpecification = ConnectorConfig .schema (),
173
+ connectionSpecification = expand_local_ref ( ConnectorConfig .schema () ),
132
174
authSpecification = AuthSpecification (
133
175
auth_type = "oauth2.0" ,
134
176
oauth2Specification = OAuth2Specification (
135
177
rootObject = [], oauthFlowInitParameters = [], oauthFlowOutputParameters = [["access_token" ]]
136
- ),
178
+ )
137
179
),
138
180
)
181
+
182
+ def _update_insights_streams (self , insights , args , streams ) -> List [Type [Stream ]]:
183
+ """Update method, if insights have values returns streams replacing the
184
+ default insights streams else returns streams
185
+
186
+ """
187
+ if not insights :
188
+ return streams
189
+
190
+ insights_custom_streams = list ()
191
+
192
+ for insight in insights :
193
+ args ["name" ] = f"Custom{ insight .name } "
194
+ args ["fields" ] = list (set (insight .fields ))
195
+ args ["breakdowns" ] = list (set (insight .breakdowns ))
196
+ args ["action_breakdowns" ] = list (set (insight .action_breakdowns ))
197
+ insight_stream = AdsInsights (** args )
198
+ insights_custom_streams .append (insight_stream )
199
+
200
+ return streams + insights_custom_streams
201
+
202
+ def _check_custom_insights_entries (self , insights : List [Mapping [str , Any ]]):
203
+
204
+ default_fields = list (ResourceSchemaLoader (package_name_from_class (self .__class__ )).get_schema ("ads_insights" ).get ("properties" , {}).keys ())
205
+ default_breakdowns = list (ResourceSchemaLoader (package_name_from_class (self .__class__ )).get_schema ("ads_insights_breakdowns" ).get ("properties" , {}).keys ())
206
+ default_actions_breakdowns = [e for e in default_breakdowns if 'action_' in e ]
207
+
208
+ for insight in insights :
209
+ if insight .get ('fields' ):
210
+ value_checked , value = self ._check_values (default_fields , insight .get ('fields' ))
211
+ if not value_checked :
212
+ message = f"{ value } is not a valid field name"
213
+ raise Exception ("Config validation error: " + message ) from None
214
+ if insight .get ('breakdowns' ):
215
+ value_checked , value = self ._check_values (default_breakdowns , insight .get ('breakdowns' ))
216
+ if not value_checked :
217
+ message = f"{ value } is not a valid breakdown name"
218
+ raise Exception ("Config validation error: " + message ) from None
219
+ if insight .get ('action_breakdowns' ):
220
+ value_checked , value = self ._check_values (default_actions_breakdowns , insight .get ('action_breakdowns' ))
221
+ if not value_checked :
222
+ message = f"{ value } is not a valid action_breakdown name"
223
+ raise Exception ("Config validation error: " + message ) from None
224
+
225
+ return True
226
+
227
+ def _check_values (self , default_value : List [str ], custom_value : List [str ]) -> Tuple [bool , Any ]:
228
+ for e in custom_value :
229
+ if e not in default_value :
230
+ logger .error (f"{ e } does not appear in { default_value } " )
231
+ return False , e
232
+
233
+ return True , None
234
+
235
+
236
+ def expand_local_ref (schema , resolver = None , ** kwargs ):
237
+ resolver = resolver or RefResolver ("" , schema )
238
+ if isinstance (schema , MutableMapping ):
239
+ if "$ref" in schema :
240
+ ref_url = schema .pop ("$ref" )
241
+ url , resolved_schema = resolver .resolve (ref_url )
242
+ schema .update (resolved_schema )
243
+ for key , value in schema .items ():
244
+ schema [key ] = expand_local_ref (value , resolver = resolver )
245
+ return schema
246
+ elif isinstance (schema , List ):
247
+ return [expand_local_ref (item , resolver = resolver ) for item in schema ]
248
+
249
+ return schema
0 commit comments