diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 554e70550e2c7..1f92cf15f9f6c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -634,7 +634,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 0.1.20 + dockerImageTag: 0.1.21 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 81058ab661a7a..d02bdba95bb95 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6731,7 +6731,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:0.1.20" +- dockerImage: "airbyte/source-salesforce:0.1.21" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 13a78f12d7def..3b9f1ddc2fa05 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.20 +LABEL io.airbyte.version=0.1.21 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py index 88c19292e5d15..766fd90c4d4cb 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py @@ -34,7 +34,7 @@ def should_give_up(exc): if exc.response is not None and exc.response.status_code == codes.forbidden: error_data = exc.response.json()[0] if error_data.get("errorCode", "") == "REQUEST_LIMIT_EXCEEDED": - give_up = False + give_up = True if give_up: logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}, body: {exc.response.text}") diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 1784d27407e32..e7edfb658f6c9 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -11,6 +11,7 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.sources.utils.schema_helpers import split_config +from requests import codes, exceptions from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream @@ -24,12 +25,24 @@ def _get_sf_object(config: Mapping[str, Any]) -> Salesforce: return sf def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: - _ = self._get_sf_object(config) - return True, None + try: + _ = self._get_sf_object(config) + return True, None + except exceptions.HTTPError as error: + error_data = error.response.json()[0] + error_code = error_data.get("errorCode") + if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": + logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") + return False, "API Call limit is exceeded" @classmethod def generate_streams( - cls, config: Mapping[str, Any], stream_names: List[str], sf_object: Salesforce, state: Mapping[str, Any] = None, stream_objects: List = None + cls, + config: Mapping[str, Any], + stream_names: List[str], + sf_object: Salesforce, + state: Mapping[str, Any] = None, + stream_objects: List = None, ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" authenticator = TokenAuthenticator(sf_object.access_token) @@ -96,6 +109,14 @@ def read( connector_state=connector_state, internal_config=internal_config, ) + except exceptions.HTTPError as error: + error_data = error.response.json()[0] + error_code = error_data.get("errorCode") + if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": + logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'") + break # if got 403 rate limit response, finish the sync with success. + raise error + except Exception as e: logger.exception(f"Encountered an exception while reading stream {self.name}") raise e diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/configured_catalog.json b/airbyte-integrations/connectors/source-salesforce/unit_tests/configured_catalog.json new file mode 100644 index 0000000000000..647831c8b86f8 --- /dev/null +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/configured_catalog.json @@ -0,0 +1,28 @@ +{ + "streams": [ + { + "stream": { + "name": "Account", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["LastModifiedDate"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "Asset", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["SystemModstamp"], + "source_defined_primary_key": [["Id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py index 85ba0850eed3d..b70b9ddbf1305 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py @@ -4,17 +4,32 @@ import csv import io +import json from unittest.mock import Mock import pytest import requests_mock -from airbyte_cdk.models import SyncMode +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type from requests.exceptions import HTTPError from source_salesforce.api import Salesforce from source_salesforce.source import SourceSalesforce from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream +@pytest.fixture(scope="module") +def configured_catalog(): + with open("unit_tests/configured_catalog.json") as f: + data = json.loads(f.read()) + return ConfiguredAirbyteCatalog.parse_obj(data) + + +@pytest.fixture(scope="module") +def state(): + state = {"Account": {"LastModifiedDate": "2021-10-01T21:18:20.000Z"}, "Asset": {"SystemModstamp": "2021-10-02T05:08:29.000Z"}} + return state + + @pytest.fixture(scope="module") def stream_config(): """Generates streams settings for BULK logic""" @@ -319,6 +334,151 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter assert sorted(filtered_streams) == sorted(predicted_filtered_streams) +def test_check_connection_rate_limit(stream_config): + source = SourceSalesforce() + logger = AirbyteLogger() + + json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}] + url = "https://login.salesforce.com/services/oauth2/token" + with requests_mock.Mocker() as m: + m.register_uri("POST", url, json=json_response, status_code=403) + result, msg = source.check_connection(logger, stream_config) + assert result is False + assert msg == "API Call limit is exceeded" + + +def configure_request_params_mock(stream_1, stream_2): + stream_1.request_params = Mock() + stream_1.request_params.return_value = {"q": "query"} + + stream_2.request_params = Mock() + stream_2.request_params.return_value = {"q": "query"} + + +def test_rate_limit_bulk(stream_config, stream_api, configured_catalog, state): + """ + Connector should stop the sync if one stream reached rate limit + stream_1, stream_2, stream_3, ... + While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process. + Next streams should not be executed. + """ + stream_1: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api) + stream_2: BulkIncrementalSalesforceStream = _generate_stream("Asset", stream_config, stream_api) + streams = [stream_1, stream_2] + configure_request_params_mock(stream_1, stream_2) + + stream_1.page_size = 6 + stream_1.state_checkpoint_interval = 5 + + source = SourceSalesforce() + source.streams = Mock() + source.streams.return_value = streams + logger = AirbyteLogger() + + json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}] + with requests_mock.Mocker() as m: + for stream in streams: + creation_responses = [] + for page in [1, 2]: + job_id = f"fake_job_{page}_{stream.name}" + creation_responses.append({"json": {"id": job_id}}) + + m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) + + resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page + + if page == 1: + # Read the first page successfully + m.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join(resp)) + else: + # Requesting for results when reading second page should fail with 403 (Rate Limit error) + m.register_uri("GET", stream.path() + f"/{job_id}/results", status_code=403, json=json_response) + + m.register_uri("DELETE", stream.path() + f"/{job_id}") + + m.register_uri("POST", stream.path(), creation_responses) + + result = [i for i in source.read(logger=logger, config=stream_config, catalog=configured_catalog, state=state)] + assert stream_1.request_params.called + assert ( + not stream_2.request_params.called + ), "The second stream should not be executed, because the first stream finished with Rate Limit." + + records = [item for item in result if item.type == Type.RECORD] + assert len(records) == 6 # stream page size: 6 + + state_record = [item for item in result if item.type == Type.STATE][0] + assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-11-05" # state checkpoint interval is 5. + + +def test_rate_limit_rest(stream_config, stream_api, configured_catalog, state): + """ + Connector should stop the sync if one stream reached rate limit + stream_1, stream_2, stream_3, ... + While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process. + Next streams should not be executed. + """ + + stream_1: IncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api, state=state) + stream_2: IncrementalSalesforceStream = _generate_stream("Asset", stream_config, stream_api, state=state) + + stream_1.state_checkpoint_interval = 3 + configure_request_params_mock(stream_1, stream_2) + + source = SourceSalesforce() + source.streams = Mock() + source.streams.return_value = [stream_1, stream_2] + + logger = AirbyteLogger() + + next_page_url = "/services/data/v52.0/query/012345" + response_1 = { + "done": False, + "totalSize": 10, + "nextRecordsUrl": next_page_url, + "records": [ + { + "ID": 1, + "LastModifiedDate": "2021-11-15", + }, + { + "ID": 2, + "LastModifiedDate": "2021-11-16", + }, + { + "ID": 3, + "LastModifiedDate": "2021-11-17", # check point interval + }, + { + "ID": 4, + "LastModifiedDate": "2021-11-18", + }, + { + "ID": 5, + "LastModifiedDate": "2021-11-19", + }, + ], + } + response_2 = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}] + + with requests_mock.Mocker() as m: + m.register_uri("GET", stream_1.path(), json=response_1, status_code=200) + m.register_uri("GET", next_page_url, json=response_2, status_code=403) + + result = [i for i in source.read(logger=logger, config=stream_config, catalog=configured_catalog, state=state)] + + assert stream_1.request_params.called + assert ( + not stream_2.request_params.called + ), "The second stream should not be executed, because the first stream finished with Rate Limit." + + records = [item for item in result if item.type == Type.RECORD] + assert len(records) == 5 + + state_record = [item for item in result if item.type == Type.STATE][0] + assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-11-17" + + def test_discover_only_queryable(stream_config): sf_object = Salesforce(**stream_config) sf_object.login = Mock() diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 2650bcf609255..3f663a03ed811 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -21,7 +21,9 @@ Several output streams are available from this source. A list of these streams c ### Performance considerations -The connector is restricted by normal Salesforce rate limiting. For large transfers we recommend using the BULK API. +The connector is restricted by daily Salesforce rate limiting. +The connector uses as much rate limit as it can every day, then ends the sync early with success status and continues the sync from where it left the next time. +Note that, picking up from where it ends will work only for incremental sync. ## Getting started @@ -737,6 +739,7 @@ List of available streams: | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:--------------------------------------------------------------------------| +| 0.1.21 | 2022-01-28 | [9499](https://github.com/airbytehq/airbyte/pull/9499) | If a sync reaches daily rate limit it ends the sync early with success status. Read more in `Performance considerations` section | | 0.1.20 | 2022-01-26 | [9757](https://github.com/airbytehq/airbyte/pull/9757) | Parse CSV with "unix" dialect | | 0.1.19 | 2022-01-25 | [8617](https://github.com/airbytehq/airbyte/pull/8617) | Update connector fields title/description | | 0.1.18 | 2022-01-20 | [9478](https://github.com/airbytehq/airbyte/pull/9478) | Add available stream filtering by `queryable` flag |