@@ -107,7 +107,78 @@ def read_records(
107
107
) -> Iterable [Mapping [str , Any ]]:
108
108
"""Main read method used by CDK"""
109
109
for record in self ._read_records (params = self .request_params (stream_state = stream_state )):
110
- yield self ._extend_record (record , fields = self .fields )
110
+ yield self .transform (self ._extend_record (record , fields = self .fields ))
111
+
112
+ # for i in range(3):
113
+ # yield self.transform(
114
+ # {
115
+ # 'tracking_specs': [
116
+ # {'action.type': ['offsite_conversion'], 'fb_pixel': ['2667253716886462']},
117
+ # {'action.type': ['attention_event'], 'creative': ['23846815595220398']},
118
+ # {'action.type': ['post_engagement'], 'page': ['112704783733939'], 'post': ['244953057175777']},
119
+ # {'action.type': ['link_click'], 'post': ['244953057175777'], 'post.wall': ['112704783733939']},
120
+ # {'action.type': ['dwell'], 'creative': ['23846815595220398']}
121
+ # ],
122
+ # 'updated_time': '2021-02-15T08:49:56-0800'
123
+ # }
124
+ # )
125
+
126
+ def transform (self , record : Mapping [str , Any ]) -> Mapping [str , Any ]:
127
+ """
128
+ Use this method to remove update fields types in record according to schema.
129
+ """
130
+ schema = self .get_json_schema ()
131
+ self .logger .error (f"12{ str (record )} " )
132
+ self .lol_dict (record , schema ["properties" ])
133
+
134
+ return record
135
+
136
+ def get_python_type (self , _types ):
137
+ types_mapping = {
138
+ "string" : str ,
139
+ "number" : float ,
140
+ "integer" : int ,
141
+ "null" : None ,
142
+ "object" : dict ,
143
+ "array" : list ,
144
+ "boolean" : bool ,
145
+ }
146
+
147
+ if isinstance (_types , list ):
148
+ return tuple ([types_mapping [t ] for t in _types if t != "null" ])
149
+
150
+ return tuple (types_mapping [_types ])
151
+
152
+ def lol_dict (self , record , schema ):
153
+ for key , value in record .items ():
154
+ if key not in schema :
155
+ continue
156
+
157
+ if isinstance (value , dict ):
158
+ self .lol_dict (record = value , schema = schema [key ].get ("properties" , {}))
159
+ elif isinstance (value , list ) and "items" in schema [key ]:
160
+ for record_list_item in value :
161
+ if list in self .get_python_type (schema [key ]["items" ]["type" ]):
162
+ # TODO If you have list of lists then add `if` below
163
+ pass
164
+ elif dict in self .get_python_type (schema [key ]["items" ]["type" ]):
165
+ self .lol_dict (record = record_list_item , schema = schema [key ]["items" ]["properties" ])
166
+ elif not isinstance (record_list_item , self .get_python_type (schema [key ]["items" ]["type" ])):
167
+ record [key ] = self .get_python_type (schema [key ]["items" ]["type" ])[0 ](record_list_item )
168
+
169
+ if not isinstance (value , self .get_python_type (schema [key ]["type" ])):
170
+ record [key ] = self .get_python_type (schema [key ]["type" ])[0 ](value )
171
+
172
+
173
+ # def lol_list(self, record, list_records, schema):
174
+ # for list_item in list_records:
175
+ # if list in self.get_python_type(schema[key]["items"]["type"]):
176
+ # # TODO If you have list of lists then add `if` below
177
+ # pass
178
+ # elif dict in self.get_python_type(schema[key]["items"]["type"]):
179
+ # self.lol_dict(record=record_list_item, schema=schema[key]["items"]["properties"])
180
+ # elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
181
+ # record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)
111
182
112
183
def _read_records (self , params : Mapping [str , Any ]) -> Iterable :
113
184
"""Wrapper around query to backoff errors.
@@ -295,7 +366,7 @@ class AdsInsights(FBMarketingIncrementalStream):
295
366
MAX_WAIT_TO_START = pendulum .duration (minutes = 5 )
296
367
MAX_WAIT_TO_FINISH = pendulum .duration (minutes = 30 )
297
368
MAX_ASYNC_SLEEP = pendulum .duration (minutes = 5 )
298
- MAX_ASYNC_JOBS = 3
369
+ MAX_ASYNC_JOBS = 10
299
370
INSIGHTS_RETENTION_PERIOD = pendulum .duration (days = 37 * 30 )
300
371
301
372
action_breakdowns = ALL_ACTION_BREAKDOWNS
@@ -305,6 +376,23 @@ class AdsInsights(FBMarketingIncrementalStream):
305
376
306
377
breakdowns = []
307
378
379
+ fields_to_transform = (
380
+ (int , ("clicks" , "impressions" , "reach" , "unique_clicks" , )),
381
+ (float , ("frequency" , "social_spend" , "spend" , "wish_bid" , )),
382
+
383
+
384
+ (list , (
385
+ ("actions" , (
386
+ (int , ("1d_click" , "7d_click" , "28d_click" , )),
387
+ (float , ("value" , ))
388
+ )),
389
+ ("unique_actions" , (
390
+ (int , ("1d_click" , "7d_click" , "28d_click" ,)),
391
+ (float , ("value" ,))
392
+ )),
393
+ )),
394
+ )
395
+
308
396
def __init__ (self , buffer_days , days_per_job , ** kwargs ):
309
397
super ().__init__ (** kwargs )
310
398
self .lookback_window = pendulum .duration (days = buffer_days )
@@ -322,7 +410,7 @@ def read_records(
322
410
# because we query `lookback_window` days before actual cursor we might get records older then cursor
323
411
324
412
for obj in result .get_result ():
325
- yield obj .export_all_data ()
413
+ yield self . transform ( obj .export_all_data () )
326
414
327
415
def stream_slices (self , stream_state : Mapping [str , Any ] = None , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
328
416
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
@@ -353,7 +441,7 @@ def wait_for_job(self, job) -> AdReportRun:
353
441
job = job .api_get ()
354
442
job_progress_pct = job ["async_percent_completion" ]
355
443
job_id = job ["report_run_id" ]
356
- self .logger .info (f"ReportRunId { job_id } is { job_progress_pct } % complete" )
444
+ self .logger .info (f"ReportRunId { job_id } is { job_progress_pct } % complete ( { job [ 'async_status' ] } ) " )
357
445
runtime = pendulum .now () - start_time
358
446
359
447
if job ["async_status" ] == "Job Completed" :
0 commit comments