-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Source Salesforce: finish the sync with success if rate limit is reached #9499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
44e5671
ed3ca08
72c9cb8
6fb027c
71321d1
f57d2be
dad31bb
4aed7d2
747a719
64a87bd
7536856
ede1ad2
1ad1117
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
from airbyte_cdk.sources.streams import Stream | ||
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator | ||
from airbyte_cdk.sources.utils.schema_helpers import split_config | ||
from requests import codes, exceptions | ||
|
||
from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce | ||
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream | ||
|
@@ -24,8 +25,15 @@ def _get_sf_object(config: Mapping[str, Any]) -> Salesforce: | |
return sf | ||
|
||
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: | ||
_ = self._get_sf_object(config) | ||
return True, None | ||
try: | ||
_ = self._get_sf_object(config) | ||
return True, None | ||
except exceptions.HTTPError as error: | ||
error_data = error.response.json()[0] | ||
error_code = error_data.get("errorCode") | ||
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": | ||
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") | ||
return False, "API Call limit is exceeded" | ||
|
||
@classmethod | ||
def generate_streams( | ||
|
@@ -96,6 +104,14 @@ def read( | |
connector_state=connector_state, | ||
internal_config=internal_config, | ||
) | ||
except exceptions.HTTPError as error: | ||
error_data = error.response.json()[0] | ||
error_code = error_data.get("errorCode") | ||
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": | ||
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") | ||
break # if got 403 rate limit response, finish the sync with success. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Connector should stop the sync if one stream reached rate limit |
||
raise error | ||
|
||
except Exception as e: | ||
logger.exception(f"Encountered an exception while reading stream {self.name}") | ||
raise e | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
{ | ||
"streams": [ | ||
{ | ||
"stream": { | ||
"name": "Account", | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh", "incremental"], | ||
"source_defined_cursor": true, | ||
"default_cursor_field": ["LastModifiedDate"], | ||
"source_defined_primary_key": [["Id"]] | ||
}, | ||
"sync_mode": "incremental", | ||
"destination_sync_mode": "append" | ||
}, | ||
{ | ||
"stream": { | ||
"name": "Asset", | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh", "incremental"], | ||
"source_defined_cursor": true, | ||
"default_cursor_field": ["SystemModstamp"], | ||
"source_defined_primary_key": [["Id"]] | ||
}, | ||
"sync_mode": "incremental", | ||
"destination_sync_mode": "append" | ||
} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Custom backoff handler is used in call calls BULK API and describe/login methods. if 403 (Rate Limit) is received no need in sleeping.