8
8
from typing import Any , Iterable , Mapping , Optional
9
9
10
10
import requests
11
+ from airbyte_cdk .sources .streams .http .error_handlers .response_models import ResponseAction
11
12
from requests .exceptions import JSONDecodeError
12
13
13
14
from .utils import API_LIMIT_PER_HOUR
@@ -22,9 +23,8 @@ class GoogleAnalyticsApiQuotaBase:
22
23
# setting the scenario values for attrs prior to the 429 error
23
24
treshold : float = 0.1
24
25
# base attrs
25
- should_retry : Optional [ bool ] = True
26
+ response_action : ResponseAction = ResponseAction . RETRY
26
27
backoff_time : Optional [int ] = None
27
- raise_on_http_errors : bool = True
28
28
# stop making new slices globally
29
29
stop_iter : bool = False
30
30
error_message = None
@@ -33,23 +33,20 @@ class GoogleAnalyticsApiQuotaBase:
33
33
"concurrentRequests" : {
34
34
"error_pattern" : "Exhausted concurrent requests quota." ,
35
35
"backoff" : 30 ,
36
- "should_retry" : True ,
37
- "raise_on_http_errors" : False ,
36
+ "response_action" : ResponseAction .RETRY ,
38
37
"stop_iter" : False ,
39
38
},
40
39
"tokensPerProjectPerHour" : {
41
40
"error_pattern" : "Exhausted property tokens for a project per hour." ,
42
41
"backoff" : 1800 ,
43
- "should_retry" : True ,
44
- "raise_on_http_errors" : False ,
42
+ "response_action" : ResponseAction .RETRY ,
45
43
"stop_iter" : False ,
46
44
"error_message" : API_LIMIT_PER_HOUR ,
47
45
},
48
46
"potentiallyThresholdedRequestsPerHour" : {
49
47
"error_pattern" : "Exhausted potentially thresholded requests quota." ,
50
48
"backoff" : 1800 ,
51
- "should_retry" : True ,
52
- "raise_on_http_errors" : False ,
49
+ "response_action" : ResponseAction .RETRY ,
53
50
"stop_iter" : False ,
54
51
"error_message" : API_LIMIT_PER_HOUR ,
55
52
},
@@ -61,22 +58,19 @@ class GoogleAnalyticsApiQuotaBase:
61
58
# 'tokensPerDay': {
62
59
# 'error_pattern': "___",
63
60
# "backoff": None,
64
- # "should_retry": False,
65
- # "raise_on_http_errors": False,
61
+ # "response_action": ResponseAction.FAIL,
66
62
# "stop_iter": True,
67
63
# },
68
64
# 'tokensPerHour': {
69
65
# 'error_pattern': "___",
70
66
# "backoff": 1800,
71
- # "should_retry": True,
72
- # "raise_on_http_errors": False,
67
+ # "response_action": ResponseAction.RETRY,
73
68
# "stop_iter": False,
74
69
# },
75
70
# 'serverErrorsPerProjectPerHour': {
76
71
# 'error_pattern': "___",
77
72
# "backoff": 3600,
78
- # "should_retry": True,
79
- # "raise_on_http_errors": False,
73
+ # "response_action": ResponseAction.RETRY,
80
74
# "stop_iter": False,
81
75
# },
82
76
}
@@ -105,16 +99,14 @@ def _get_known_quota_from_response(self, property_quota: Mapping[str, Any]) -> M
105
99
def _set_retry_attrs_for_quota (self , quota_name : str ) -> None :
106
100
quota = self .quota_mapping .get (quota_name , {})
107
101
if quota :
108
- self .should_retry = quota .get ("should_retry" )
109
- self .raise_on_http_errors = quota .get ("raise_on_http_errors" )
102
+ self .response_action = quota .get ("response_action" )
110
103
self .stop_iter = quota .get ("stop_iter" )
111
104
self .backoff_time = quota .get ("backoff" )
112
- self .error_message = quota .get ("error_message" )
105
+ self .error_message = quota .get ("error_message" , quota . get ( "error_pattern" ) )
113
106
114
- def _set_default_retry_attrs (self ) -> None :
115
- self .should_retry = True
107
+ def _set_default_handle_error_attrs (self ) -> None :
108
+ self .response_action = ResponseAction . RETRY
116
109
self .backoff_time = None
117
- self .raise_on_http_errors = True
118
110
self .stop_iter = False
119
111
120
112
def _set_initial_quota (self , current_quota : Optional [Mapping [str , Any ]] = None ) -> None :
@@ -137,7 +129,7 @@ def _check_remaining_quota(self, current_quota: Mapping[str, Any]) -> None:
137
129
def _check_for_errors (self , response : requests .Response ) -> None :
138
130
try :
139
131
# revert to default values after successul retry
140
- self ._set_default_retry_attrs ()
132
+ self ._set_default_handle_error_attrs ()
141
133
error = response .json ().get ("error" )
142
134
if error :
143
135
quota_name = self ._get_quota_name_from_error_message (error .get ("message" ))
@@ -161,7 +153,7 @@ def _check_quota(self, response: requests.Response):
161
153
try :
162
154
parsed_response = response .json ()
163
155
except (AttributeError , JSONDecodeError ) as e :
164
- self .logger .warn (
156
+ self .logger .warning (
165
157
f"`GoogleAnalyticsApiQuota._check_quota`: Received non JSON response from the API. Full error: { e } . Bypassing."
166
158
)
167
159
parsed_response = {}
@@ -170,7 +162,7 @@ def _check_quota(self, response: requests.Response):
170
162
if property_quota :
171
163
# return default attrs values once successfully retried
172
164
# or until another 429 error is hit
173
- self ._set_default_retry_attrs ()
165
+ self ._set_default_handle_error_attrs ()
174
166
# reduce quota list to known kinds only
175
167
current_quota = self ._get_known_quota_from_response (property_quota )
176
168
if current_quota :
@@ -183,7 +175,7 @@ def _check_quota(self, response: requests.Response):
183
175
184
176
def handle_quota (self ) -> None :
185
177
"""
186
- The function decorator is used to integrate with the `should_retry ` method,
178
+ The function decorator is used to integrate with the `interpret_response ` method,
187
179
or any other method that provides early access to the `response` object.
188
180
"""
189
181
0 commit comments