Skip to content

Source Salesforce: finish the sync with success if rate limit is reached #9499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def should_give_up(exc):
if exc.response is not None and exc.response.status_code == codes.forbidden:
error_data = exc.response.json()[0]
if error_data.get("errorCode", "") == "REQUEST_LIMIT_EXCEEDED":
give_up = False
give_up = True
Copy link
Contributor Author

@augan-rymkhan augan-rymkhan Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom backoff handler is used in call calls BULK API and describe/login methods. if 403 (Rate Limit) is received no need in sleeping.


if give_up:
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}, body: {exc.response.text}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import split_config
from requests import codes, exceptions

from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream
Expand All @@ -24,8 +25,15 @@ def _get_sf_object(config: Mapping[str, Any]) -> Salesforce:
return sf

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
_ = self._get_sf_object(config)
return True, None
try:
_ = self._get_sf_object(config)
return True, None
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
return False, "API Call limit is exceeded"

@classmethod
def generate_streams(
Expand Down Expand Up @@ -96,6 +104,14 @@ def read(
connector_state=connector_state,
internal_config=internal_config,
)
except exceptions.HTTPError as error:
error_data = error.response.json()[0]
error_code = error_data.get("errorCode")
if error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
logger.warn(f"API Call limit is exceeded. Error message: '{error_data.get('message')}'")
break # if got 403 rate limit response, finish the sync with success.
Copy link
Contributor Author

@augan-rymkhan augan-rymkhan Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector should stop the sync if one stream reached rate limit
If the connector has stream_1, stream_2, stream_3.
While reading stream_1 if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process.
The following streams should not be executed, to do that break the cycle.

raise error

except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
raise e
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"streams": [
{
"stream": {
"name": "Account",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["LastModifiedDate"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "Asset",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,32 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
from unittest.mock import Mock

import pytest
import requests_mock
from airbyte_cdk.models import SyncMode
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
from requests.exceptions import HTTPError
from source_salesforce.api import Salesforce
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, IncrementalSalesforceStream, SalesforceStream


@pytest.fixture(scope="module")
def configured_catalog():
with open("unit_tests/configured_catalog.json") as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)


@pytest.fixture(scope="module")
def state():
state = {"Account": {"LastModifiedDate": "2021-10-01T21:18:20.000Z"}, "Asset": {"SystemModstamp": "2021-10-02T05:08:29.000Z"}}
return state


@pytest.fixture(scope="module")
def stream_config():
"""Generates streams settings for BULK logic"""
Expand Down Expand Up @@ -317,6 +332,151 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter
assert sorted(filtered_streams) == sorted(predicted_filtered_streams)


def test_check_connection_rate_limit(stream_config):
source = SourceSalesforce()
logger = AirbyteLogger()

json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}]
url = "https://login.salesforce.com/services/oauth2/token"
with requests_mock.Mocker() as m:
m.register_uri("POST", url, json=json_response, status_code=403)
result, msg = source.check_connection(logger, stream_config)
assert result is False
assert msg == "API Call limit is exceeded"


def configure_request_params_mock(stream_1, stream_2):
stream_1.request_params = Mock()
stream_1.request_params.return_value = {"q": "query"}

stream_2.request_params = Mock()
stream_2.request_params.return_value = {"q": "query"}


def test_rate_limit_bulk(stream_config, stream_api, configured_catalog, state):
"""
Connector should stop the sync if one stream reached rate limit
stream_1, stream_2, stream_3, ...
While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process.
Next streams should not be executed.
"""
stream_1: BulkIncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api)
stream_2: BulkIncrementalSalesforceStream = _generate_stream("Asset", stream_config, stream_api)
streams = [stream_1, stream_2]
configure_request_params_mock(stream_1, stream_2)

stream_1.page_size = 6
stream_1.state_checkpoint_interval = 5

source = SourceSalesforce()
source.streams = Mock()
source.streams.return_value = streams
logger = AirbyteLogger()

json_response = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}]
with requests_mock.Mocker() as m:
for stream in streams:
creation_responses = []
for page in [1, 2]:
job_id = f"fake_job_{page}_{stream.name}"
creation_responses.append({"json": {"id": job_id}})

m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"})

resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page

if page == 1:
# Read the first page successfully
m.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join(resp))
else:
# Requesting for results when reading second page should fail with 403 (Rate Limit error)
m.register_uri("GET", stream.path() + f"/{job_id}/results", status_code=403, json=json_response)

m.register_uri("DELETE", stream.path() + f"/{job_id}")

m.register_uri("POST", stream.path(), creation_responses)

result = [i for i in source.read(logger=logger, config=stream_config, catalog=configured_catalog, state=state)]
assert stream_1.request_params.called
assert (
not stream_2.request_params.called
), "The second stream should not be executed, because the first stream finished with Rate Limit."

records = [item for item in result if item.type == Type.RECORD]
assert len(records) == 6 # stream page size: 6

state_record = [item for item in result if item.type == Type.STATE][0]
assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-11-05" # state checkpoint interval is 5.


def test_rate_limit_rest(stream_config, stream_api, configured_catalog, state):
"""
Connector should stop the sync if one stream reached rate limit
stream_1, stream_2, stream_3, ...
While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process.
Next streams should not be executed.
"""

stream_1: IncrementalSalesforceStream = _generate_stream("Account", stream_config, stream_api, state=state)
stream_2: IncrementalSalesforceStream = _generate_stream("Asset", stream_config, stream_api, state=state)

stream_1.state_checkpoint_interval = 3
configure_request_params_mock(stream_1, stream_2)

source = SourceSalesforce()
source.streams = Mock()
source.streams.return_value = [stream_1, stream_2]

logger = AirbyteLogger()

next_page_url = "/services/data/v52.0/query/012345"
response_1 = {
"done": False,
"totalSize": 10,
"nextRecordsUrl": next_page_url,
"records": [
{
"ID": 1,
"LastModifiedDate": "2021-11-15",
},
{
"ID": 2,
"LastModifiedDate": "2021-11-16",
},
{
"ID": 3,
"LastModifiedDate": "2021-11-17", # check point interval
},
{
"ID": 4,
"LastModifiedDate": "2021-11-18",
},
{
"ID": 5,
"LastModifiedDate": "2021-11-19",
},
],
}
response_2 = [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "TotalRequests Limit exceeded."}]

with requests_mock.Mocker() as m:
m.register_uri("GET", stream_1.path(), json=response_1, status_code=200)
m.register_uri("GET", next_page_url, json=response_2, status_code=403)

result = [i for i in source.read(logger=logger, config=stream_config, catalog=configured_catalog, state=state)]

assert stream_1.request_params.called
assert (
not stream_2.request_params.called
), "The second stream should not be executed, because the first stream finished with Rate Limit."

records = [item for item in result if item.type == Type.RECORD]
assert len(records) == 5

state_record = [item for item in result if item.type == Type.STATE][0]
assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-11-17"


def test_pagination_rest(stream_config, stream_api):
stream_name = "ActiveFeatureLicenseMetric"
state = {stream_name: {"SystemModstamp": "2122-08-22T05:08:29.000Z"}}
Expand Down