27
27
from abc import ABC
28
28
from collections import deque
29
29
from datetime import datetime
30
- from typing import Any , Iterable , Iterator , List , Mapping , MutableMapping , Optional , Sequence
30
+ from typing import Any , Iterable , Iterator , List , Mapping , MutableMapping , Optional , Sequence , Union
31
31
32
32
import backoff
33
33
import pendulum
46
46
backoff_policy = retry_pattern (backoff .expo , FacebookRequestError , max_tries = 5 , factor = 5 )
47
47
48
48
49
- def remove_params_from_url (url : str , params : [str ]) -> str :
49
+ def remove_params_from_url (url : str , params : List [str ]) -> str :
50
50
"""
51
51
Parses a URL and removes the query parameters specified in params
52
52
:param url: URL
@@ -110,7 +110,63 @@ def read_records(
110
110
) -> Iterable [Mapping [str , Any ]]:
111
111
"""Main read method used by CDK"""
112
112
for record in self ._read_records (params = self .request_params (stream_state = stream_state )):
113
- yield self ._extend_record (record , fields = self .fields )
113
+ yield self .transform (self ._extend_record (record , fields = self .fields ))
114
+
115
+ def transform (self , record : Mapping [str , Any ]) -> Mapping [str , Any ]:
116
+ """
117
+ Use this method to remove update fields types in record according to schema.
118
+ """
119
+ schema = self .get_json_schema ()
120
+ self .convert_to_schema_types (record , schema ["properties" ])
121
+ return record
122
+
123
+ def get_python_type (self , _types : Union [list , str ]) -> tuple :
124
+ """Converts types from schema to python types. Examples:
125
+ - `["string", "null"]` will be converted to `(str,)`
126
+ - `["array", "string", "null"]` will be converted to `(list, str,)`
127
+ - `"boolean"` will be converted to `(bool,)`
128
+ """
129
+ types_mapping = {
130
+ "string" : str ,
131
+ "number" : float ,
132
+ "integer" : int ,
133
+ "object" : dict ,
134
+ "array" : list ,
135
+ "boolean" : bool ,
136
+ }
137
+
138
+ if isinstance (_types , list ):
139
+ return tuple ([types_mapping [t ] for t in _types if t != "null" ])
140
+
141
+ return (types_mapping [_types ],)
142
+
143
+ def convert_to_schema_types (self , record : Mapping [str , Any ], schema : Mapping [str , Any ]):
144
+ """
145
+ Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value
146
+ and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string.
147
+ This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all
148
+ types from schema.
149
+ """
150
+ if not schema :
151
+ return
152
+
153
+ for key , value in record .items ():
154
+ if key not in schema :
155
+ continue
156
+
157
+ if isinstance (value , dict ):
158
+ self .convert_to_schema_types (record = value , schema = schema [key ].get ("properties" , {}))
159
+ elif isinstance (value , list ) and "items" in schema [key ]:
160
+ for record_list_item in value :
161
+ if list in self .get_python_type (schema [key ]["items" ]["type" ]):
162
+ # TODO Currently we don't have support for list of lists.
163
+ pass
164
+ elif dict in self .get_python_type (schema [key ]["items" ]["type" ]):
165
+ self .convert_to_schema_types (record = record_list_item , schema = schema [key ]["items" ]["properties" ])
166
+ elif not isinstance (record_list_item , self .get_python_type (schema [key ]["items" ]["type" ])):
167
+ record [key ] = self .get_python_type (schema [key ]["items" ]["type" ])[0 ](record_list_item )
168
+ elif not isinstance (value , self .get_python_type (schema [key ]["type" ])):
169
+ record [key ] = self .get_python_type (schema [key ]["type" ])[0 ](value )
114
170
115
171
def _read_records (self , params : Mapping [str , Any ]) -> Iterable :
116
172
"""Wrapper around query to backoff errors.
@@ -298,7 +354,7 @@ class AdsInsights(FBMarketingIncrementalStream):
298
354
MAX_WAIT_TO_START = pendulum .duration (minutes = 5 )
299
355
MAX_WAIT_TO_FINISH = pendulum .duration (minutes = 30 )
300
356
MAX_ASYNC_SLEEP = pendulum .duration (minutes = 5 )
301
- MAX_ASYNC_JOBS = 3
357
+ MAX_ASYNC_JOBS = 10
302
358
INSIGHTS_RETENTION_PERIOD = pendulum .duration (days = 37 * 30 )
303
359
304
360
action_breakdowns = ALL_ACTION_BREAKDOWNS
@@ -325,7 +381,7 @@ def read_records(
325
381
# because we query `lookback_window` days before actual cursor we might get records older then cursor
326
382
327
383
for obj in result .get_result ():
328
- yield obj .export_all_data ()
384
+ yield self . transform ( obj .export_all_data () )
329
385
330
386
def stream_slices (self , stream_state : Mapping [str , Any ] = None , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
331
387
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
@@ -356,7 +412,7 @@ def wait_for_job(self, job) -> AdReportRun:
356
412
job = job .api_get ()
357
413
job_progress_pct = job ["async_percent_completion" ]
358
414
job_id = job ["report_run_id" ]
359
- self .logger .info (f"ReportRunId { job_id } is { job_progress_pct } % complete" )
415
+ self .logger .info (f"ReportRunId { job_id } is { job_progress_pct } % complete ( { job [ 'async_status' ] } ) " )
360
416
runtime = pendulum .now () - start_time
361
417
362
418
if job ["async_status" ] == "Job Completed" :
0 commit comments