4
4
5
5
6
6
import traceback
7
- from typing import Any , List , Mapping , Tuple , Union
7
+ from typing import Any , Iterable , List , Mapping , MutableMapping , Tuple
8
8
9
9
from airbyte_cdk import AirbyteLogger
10
10
from airbyte_cdk .models import SyncMode
11
11
from airbyte_cdk .sources import AbstractSource
12
12
from airbyte_cdk .sources .streams import Stream
13
13
from google .ads .googleads .errors import GoogleAdsException
14
14
from pendulum import parse , timezone , today
15
- from pendulum .tz .timezone import Timezone
16
15
17
16
from .custom_query_stream import CustomQuery
18
17
from .google_ads import GoogleAds
18
+ from .models import Customer
19
19
from .streams import (
20
20
AccountPerformanceReport ,
21
21
Accounts ,
22
+ ServiceAccounts ,
22
23
AdGroupAdLabels ,
23
24
AdGroupAdReport ,
24
25
AdGroupAds ,
38
39
39
40
class SourceGoogleAds (AbstractSource ):
40
41
@staticmethod
41
- def get_credentials (config : Mapping [str , Any ]) -> Mapping [str , Any ]:
42
+ def get_credentials (config : Mapping [str , Any ]) -> MutableMapping [str , Any ]:
42
43
credentials = config ["credentials" ]
43
44
# use_proto_plus is set to True, because setting to False returned wrong value types, which breakes the backward compatibility.
44
45
# For more info read the related PR's description: https://github.com/airbytehq/airbyte/pull/9996
@@ -50,36 +51,25 @@ def get_credentials(config: Mapping[str, Any]) -> Mapping[str, Any]:
50
51
return credentials
51
52
52
53
@staticmethod
53
- def get_incremental_stream_config (google_api : GoogleAds , config : Mapping [str , Any ], tz : Union [ timezone , str ] = "local" ):
54
+ def get_incremental_stream_config (google_api : GoogleAds , config : Mapping [str , Any ], customers : List [ Customer ] ):
54
55
true_end_date = None
55
56
configured_end_date = config .get ("end_date" )
56
57
if configured_end_date is not None :
57
58
true_end_date = min (today (), parse (configured_end_date )).to_date_string ()
58
59
incremental_stream_config = dict (
59
60
api = google_api ,
61
+ customers = customers ,
60
62
conversion_window_days = config ["conversion_window_days" ],
61
63
start_date = config ["start_date" ],
62
- time_zone = tz ,
63
64
end_date = true_end_date ,
64
65
)
65
66
return incremental_stream_config
66
67
67
- def get_account_info (self , google_api : GoogleAds , config : Mapping [str , Any ]) -> dict :
68
- incremental_stream_config = self .get_incremental_stream_config (google_api , config )
69
- accounts_streams = Accounts (** incremental_stream_config )
70
- for stream_slice in accounts_streams .stream_slices (sync_mode = SyncMode .full_refresh ):
71
- return next (accounts_streams .read_records (sync_mode = SyncMode .full_refresh , stream_slice = stream_slice ), {})
72
-
73
- @staticmethod
74
- def get_time_zone (account : dict ) -> Union [timezone , str ]:
75
- time_zone_name = account .get ("customer.time_zone" )
76
- if time_zone_name :
77
- return Timezone (time_zone_name )
78
- return "local"
79
-
80
- @staticmethod
81
- def is_manager_account (account : dict ) -> bool :
82
- return bool (account .get ("customer.manager" ))
68
+ def get_account_info (self , google_api : GoogleAds , config : Mapping [str , Any ]) -> Iterable [Iterable [Mapping [str , Any ]]]:
69
+ dummy_customers = [Customer (id = _id ) for _id in config ["customer_id" ].split ("," )]
70
+ accounts_stream = ServiceAccounts (google_api , customers = dummy_customers )
71
+ for slice_ in accounts_stream .stream_slices ():
72
+ yield accounts_stream .read_records (sync_mode = SyncMode .full_refresh , stream_slice = slice_ )
83
73
84
74
@staticmethod
85
75
def is_metrics_in_custom_query (query : str ) -> bool :
@@ -92,42 +82,42 @@ def is_metrics_in_custom_query(query: str) -> bool:
92
82
def check_connection (self , logger : AirbyteLogger , config : Mapping [str , Any ]) -> Tuple [bool , any ]:
93
83
try :
94
84
logger .info ("Checking the config" )
95
- google_api = GoogleAds (credentials = self .get_credentials (config ), customer_id = config ["customer_id" ])
96
- account_info = self .get_account_info (google_api , config )
97
- is_manager_account = self .is_manager_account (account_info )
85
+ google_api = GoogleAds (credentials = self .get_credentials (config ))
98
86
87
+ accounts = self .get_account_info (google_api , config )
88
+ customers = Customer .from_accounts (accounts )
99
89
# Check custom query request validity by sending metric request with non-existant time window
100
- for query in config .get ("custom_queries" , []):
101
- query = query .get ("query" )
102
-
103
- if is_manager_account and self .is_metrics_in_custom_query (query ):
104
- return False , f"Metrics are not available for manager account. Check fields in your custom query: { query } "
105
- if CustomQuery .cursor_field in query :
106
- return False , f"Custom query should not contain { CustomQuery .cursor_field } "
107
-
108
- req_q = CustomQuery .insert_segments_date_expr (query , "1980-01-01" , "1980-01-01" )
109
- for customer_id in google_api .customer_ids :
110
- google_api .send_request (req_q , customer_id = customer_id )
90
+ for customer in customers :
91
+ for query in config .get ("custom_queries" , []):
92
+ query = query .get ("query" )
93
+ if customer .is_manager_account and self .is_metrics_in_custom_query (query ):
94
+ return False , f"Metrics are not available for manager account. Check fields in your custom query: { query } "
95
+ if CustomQuery .cursor_field in query :
96
+ return False , f"Custom query should not contain { CustomQuery .cursor_field } "
97
+ req_q = CustomQuery .insert_segments_date_expr (query , "1980-01-01" , "1980-01-01" )
98
+ response = google_api .send_request (req_q , customer_id = customer .id )
99
+ # iterate over the response otherwise exceptions will not be raised!
100
+ for _ in response :
101
+ pass
111
102
return True , None
112
103
except GoogleAdsException as exception :
113
104
error_messages = ", " .join ([error .message for error in exception .failure .errors ])
114
105
logger .error (traceback .format_exc ())
115
106
return False , f"Unable to connect to Google Ads API with the provided credentials - { error_messages } "
116
107
117
108
def streams (self , config : Mapping [str , Any ]) -> List [Stream ]:
118
- google_api = GoogleAds (credentials = self .get_credentials (config ), customer_id = config ["customer_id" ])
119
- account_info = self .get_account_info (google_api , config )
120
- time_zone = self .get_time_zone (account_info )
121
- incremental_stream_config = self .get_incremental_stream_config (google_api , config , tz = time_zone )
122
-
109
+ google_api = GoogleAds (credentials = self .get_credentials (config ))
110
+ accounts = self .get_account_info (google_api , config )
111
+ customers = Customer .from_accounts (accounts )
112
+ incremental_stream_config = self .get_incremental_stream_config (google_api , config , customers )
123
113
streams = [
124
114
AdGroupAds (** incremental_stream_config ),
125
- AdGroupAdLabels (google_api ),
115
+ AdGroupAdLabels (google_api , customers = customers ),
126
116
AdGroups (** incremental_stream_config ),
127
- AdGroupLabels (google_api ),
117
+ AdGroupLabels (google_api , customers = customers ),
128
118
Accounts (** incremental_stream_config ),
129
119
Campaigns (** incremental_stream_config ),
130
- CampaignLabels (google_api ),
120
+ CampaignLabels (google_api , customers = customers ),
131
121
ClickView (** incremental_stream_config ),
132
122
]
133
123
custom_query_streams = [
@@ -137,7 +127,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
137
127
streams .extend (custom_query_streams )
138
128
139
129
# Metrics streams cannot be requested for a manager account.
140
- if not self .is_manager_account (account_info ):
130
+ non_manager_accounts = [customer for customer in customers if not customer .is_manager_account ]
131
+ if non_manager_accounts :
132
+ incremental_stream_config ["customers" ] = non_manager_accounts
141
133
streams .extend (
142
134
[
143
135
UserLocationReport (** incremental_stream_config ),
0 commit comments