Skip to content

Salesforce: retry on download_data and create_stream_job #36385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.4.0
dockerImageTag: 2.4.1
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.0"
version = "2.4.1"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@
exceptions.ReadTimeout,
exceptions.ConnectionError,
exceptions.HTTPError,
# We've had a couple of customers with ProtocolErrors, namely:
# * A self-managed instance during `BulkSalesforceStream.download_data`. This customer had an abnormally high number of ConnectionError
# which seems to indicate problems with his network infrastructure in general. The exact error was: `urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(905 bytes read, 119 more expected)', IncompleteRead(905 bytes read, 119 more expected))`
# * A cloud customer with very long syncs. All those syncs would end up with the following error: `urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))`
# Without much more information, we will make it retryable hoping that performing the same request will work.
exceptions.ChunkedEncodingError,
# We've had examples where the response from Salesforce was not a JSON response. Those cases where error cases though. For example:
# https://github.com/airbytehq/airbyte-internal-issues/issues/6855. We will assume that this is an edge issue and that retry should help
exceptions.JSONDecodeError,
)

logger = logging.getLogger("airbyte")


def default_backoff_handler(max_tries: int, factor: int, **kwargs):
def default_backoff_handler(max_tries: int, backoff_method={"method": backoff.expo, "params": {"factor": 15}}, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like individual parameters would be better. Let's see if we can do this even if method can take different parameters

def log_retry_attempt(details):
_, exc, _ = sys.exc_info()
logger.info(str(exc))
Expand All @@ -40,12 +49,11 @@ def should_give_up(exc):
return give_up

return backoff.on_exception(
backoff.expo,
backoff_method["method"],
TRANSIENT_EXCEPTIONS,
jitter=None,
on_backoff=log_retry_attempt,
giveup=should_give_up,
max_tries=max_tries,
factor=factor,
**kwargs,
**backoff_method["params"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from contextlib import closing
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union

import backoff
import pandas as pd
import pendulum
import requests # type: ignore[import]
Expand All @@ -31,7 +32,7 @@
from .api import PARENT_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .availability_strategy import SalesforceAvailabilityStrategy
from .exceptions import SalesforceException, TmpFileIOError
from .rate_limiting import default_backoff_handler
from .rate_limiting import TRANSIENT_EXCEPTIONS, default_backoff_handler

# https://stackoverflow.com/a/54517228
CSV_FIELD_SIZE_LIMIT = int(ctypes.c_ulong(-1).value // 2)
Expand Down Expand Up @@ -351,24 +352,38 @@ def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str:

transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization)

@default_backoff_handler(max_tries=5, factor=15)
@default_backoff_handler(max_tries=5, backoff_method={"method": backoff.expo, "params": {"factor": 15}})
def _send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
"""
This method should be used when you don't have to read data from the HTTP body. Else, you will have to retry when you actually read
the response buffer (which is either by calling `json` or `iter_content`)
"""
return self._non_retryable_send_http_request(method, url, json, headers, stream)

def _non_retryable_send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header()
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
if response.status_code not in [200, 204]:
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
response.raise_for_status()
return response

@default_backoff_handler(max_tries=5, backoff_method={"method": backoff.expo, "params": {"factor": 15}})
def _create_stream_job(self, query: str, url: str) -> Optional[str]:
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
response = self._non_retryable_send_http_request("POST", url, json=json)
job_id: str = response.json()["id"]
return job_id

def create_stream_job(self, query: str, url: str) -> Optional[str]:
"""
docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.html

Note that we want to retry during connection issues as well. Those can occur when calling `.json()`. Even in the case of a
connection error during a HTTPError, we will retry as else, we won't be able to take the right action.
"""
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
try:
response = self._send_http_request("POST", url, json=json)
job_id: str = response.json()["id"]
return job_id
return self._create_stream_job(query, url)
except exceptions.HTTPError as error:
if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]:
# A part of streams can't be used by BULK API. Every API version can have a custom list of
Expand Down Expand Up @@ -520,6 +535,7 @@ def get_response_encoding(self, headers) -> str:

return self.encoding

@default_backoff_handler(max_tries=5, backoff_method={"method": backoff.constant, "params": {"interval": 5}})
def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dict]:
"""
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations.
Expand All @@ -529,7 +545,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dic
"""
# set filepath for binary data from response
tmp_file = str(uuid.uuid4())
with closing(self._send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open(
with closing(self._non_retryable_send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open(
tmp_file, "wb"
) as data_file:
response_headers = response.headers
Expand Down Expand Up @@ -615,6 +631,7 @@ def read_records(
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
call_count: int = 0,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
next_page_token = None
Expand Down Expand Up @@ -643,7 +660,16 @@ def read_records(
while True:
req = PreparedRequest()
req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator})
tmp_file, response_encoding, response_headers = self.download_data(url=req.url)
try:
tmp_file, response_encoding, response_headers = self.download_data(url=req.url)
except TRANSIENT_EXCEPTIONS as exception:
if call_count != 0:
raise exception
call_count += 1
self.logger.warning("Downloading data failed even after retries. Retrying the whole job...")
yield from self.read_records(sync_mode, cursor_field, stream_slice, stream_state, call_count=call_count)
return

for record in self.read_with_chunks(tmp_file, response_encoding):
yield record

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airbyte_cdk.test.entrypoint_wrapper import read
from airbyte_cdk.utils import AirbyteTracedException
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import HTTPError
from requests.exceptions import ChunkedEncodingError, HTTPError
from source_salesforce.api import Salesforce
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
from source_salesforce.source import SourceSalesforce
Expand All @@ -38,12 +38,19 @@
BulkIncrementalSalesforceStream,
BulkSalesforceStream,
BulkSalesforceSubStream,
Describe,
IncrementalRestSalesforceStream,
RestSalesforceStream,
SalesforceStream,
)

_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"]
_A_JSON_RESPONSE = {"id": "any id"}
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = {"state": "JobComplete"}
_A_PK = "a_pk"
_A_STREAM_NAME = "a_stream_name"

_NUMBER_OF_DOWNLOAD_TRIES = 5
_FIRST_CALL_FROM_JOB_CREATION = 1

_ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []})
_ANY_CONFIG = {}
_ANY_STATE = None
Expand Down Expand Up @@ -589,6 +596,52 @@ def test_csv_reader_dialect_unix():
assert result == data


@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_retryable_error_when_download_data_then_retry(send_http_request_patch):
send_http_request_patch.return_value.iter_content.side_effect = [HTTPError(), _A_CHUNKED_RESPONSE]
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).download_data(url="any url")
assert send_http_request_patch.call_count == 2


@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch):
sf_api = Mock()
sf_api.generate_schema.return_value = {}
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com"
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE]
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2
send_http_request_patch.return_value.iter_content.side_effect = HTTPError()

with pytest.raises(Exception):
list(BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=sf_api, pk=_A_PK).read_records(SyncMode.full_refresh))

assert send_http_request_patch.call_count == (len(job_creation_return_values) + _NUMBER_OF_DOWNLOAD_TRIES) * 2


@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_http_errors_when_create_stream_job_then_retry(send_http_request_patch):
send_http_request_patch.return_value.json.side_effect = [HTTPError(), _A_JSON_RESPONSE]
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url")
assert send_http_request_patch.call_count == 2


@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_fail_with_http_errors_when_create_stream_job_then_handle_error(send_http_request_patch):
mocked_response = Mock()
mocked_response.status_code = 666
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👿

send_http_request_patch.return_value.json.side_effect = HTTPError(response=mocked_response)

with pytest.raises(HTTPError):
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url")


@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_retryable_error_that_are_not_http_errors_when_create_stream_job_then_retry(send_http_request_patch):
send_http_request_patch.return_value.json.side_effect = [ChunkedEncodingError(), _A_JSON_RESPONSE]
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url")
assert send_http_request_patch.call_count == 2


@pytest.mark.parametrize(
"stream_names,catalog_stream_names,",
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import pathlib
from typing import List
from unittest.mock import Mock

Expand All @@ -22,14 +23,14 @@ def time_sleep_mock(mocker):

@pytest.fixture(scope="module")
def bulk_catalog():
with open("unit_tests/bulk_catalog.json") as f:
with (pathlib.Path(__file__).parent / "bulk_catalog.json").open() as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)


@pytest.fixture(scope="module")
def rest_catalog():
with open("unit_tests/rest_catalog.json") as f:
with (pathlib.Path(__file__).parent / "rest_catalog.json").open() as f:
data = json.loads(f.read())
return ConfiguredAirbyteCatalog.parse_obj(data)

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state |
| 2.4.1 | 2024-03-22 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests on IncompleteRead |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be updated...

| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state |
| 2.3.3 | 2024-03-04 | [35791](https://github.com/airbytehq/airbyte/pull/35791) | Fix memory leak (OOM) |
| 2.3.2 | 2024-02-19 | [35421](https://github.com/airbytehq/airbyte/pull/35421) | Add Stream Slice Step option to specification |
| 2.3.1 | 2024-02-12 | [35147](https://github.com/airbytehq/airbyte/pull/35147) | Manage dependencies with Poetry. |
Expand Down
Loading