Skip to content

Commit 33d6c50

Browse files
authored
Salesforce: retry on download_data and create_stream_job (#36385)
1 parent fbe5f26 commit 33d6c50

File tree

8 files changed

+153
-30
lines changed

8 files changed

+153
-30
lines changed

airbyte-integrations/connectors/source-salesforce/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.4.0
13+
dockerImageTag: 2.4.1
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.4.0"
6+
version = "2.4.1"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
from typing import Any, List, Mapping, Optional, Tuple
88

9+
import backoff
910
import requests # type: ignore[import]
1011
from airbyte_cdk.models import ConfiguredAirbyteCatalog
1112
from airbyte_cdk.utils import AirbyteTracedException
@@ -300,7 +301,7 @@ def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAi
300301
validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)]
301302
return {stream_name: sobject_options for stream_name, sobject_options in stream_objects.items() if stream_name in validated_streams}
302303

303-
@default_backoff_handler(max_tries=5, factor=5)
304+
@default_backoff_handler(max_tries=5, backoff_method=backoff.expo, backoff_params={"factor": 5})
304305
def _make_request(
305306
self, http_method: str, url: str, headers: dict = None, body: dict = None, stream: bool = False, params: dict = None
306307
) -> requests.models.Response:

airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py

+35-5
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,50 @@
1515
exceptions.ReadTimeout,
1616
exceptions.ConnectionError,
1717
exceptions.HTTPError,
18+
# We've had a couple of customers with ProtocolErrors, namely:
19+
# * A self-managed instance during `BulkSalesforceStream.download_data`. This customer had an abnormally high number of ConnectionError
20+
# which seems to indicate problems with his network infrastructure in general. The exact error was: `urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(905 bytes read, 119 more expected)', IncompleteRead(905 bytes read, 119 more expected))`
21+
# * A cloud customer with very long syncs. All those syncs would end up with the following error: `urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))`
22+
# Without much more information, we will make it retryable hoping that performing the same request will work.
23+
exceptions.ChunkedEncodingError,
24+
# We've had examples where the response from Salesforce was not a JSON response. Those cases where error cases though. For example:
25+
# https://github.com/airbytehq/airbyte-internal-issues/issues/6855. We will assume that this is an edge issue and that retry should help
26+
exceptions.JSONDecodeError,
1827
)
1928

29+
_RETRYABLE_400_STATUS_CODES = {
30+
# Using debug mode and breakpointing on the issue, we were able to validate that there issues are retryable. We've also opened a case
31+
# with Salesforce to try to understand what is causing that as the response does not have a body.
32+
406,
33+
# Most of the time, they don't have a body but there was one from the Salesforce Edge mentioning "We are setting things up. This process
34+
# can take a few minutes. This page will auto-refresh when ready. If it takes too long, please contact support or visit our <a>status
35+
# page</a> for more information." We therefore assume this is a transient error and will retry on it.
36+
420,
37+
codes.too_many_requests,
38+
}
39+
40+
2041
logger = logging.getLogger("airbyte")
2142

2243

23-
def default_backoff_handler(max_tries: int, factor: int, **kwargs):
44+
def default_backoff_handler(max_tries: int, backoff_method=None, backoff_params=None):
45+
if backoff_method is None or backoff_params is None:
46+
if not (backoff_method is None and backoff_params is None):
47+
raise ValueError("Both `backoff_method` and `backoff_params` need to be provided if one is provided")
48+
backoff_method = backoff.expo
49+
backoff_params = {"factor": 15}
50+
2451
def log_retry_attempt(details):
2552
_, exc, _ = sys.exc_info()
2653
logger.info(str(exc))
2754
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying...")
2855

2956
def should_give_up(exc):
30-
give_up = exc.response is not None and exc.response.status_code != codes.too_many_requests and 400 <= exc.response.status_code < 500
57+
give_up = (
58+
exc.response is not None
59+
and exc.response.status_code not in _RETRYABLE_400_STATUS_CODES
60+
and 400 <= exc.response.status_code < 500
61+
)
3162

3263
# Salesforce can return an error with a limit using a 403 code error.
3364
if exc.response is not None and exc.response.status_code == codes.forbidden:
@@ -40,12 +71,11 @@ def should_give_up(exc):
4071
return give_up
4172

4273
return backoff.on_exception(
43-
backoff.expo,
74+
backoff_method,
4475
TRANSIENT_EXCEPTIONS,
4576
jitter=None,
4677
on_backoff=log_retry_attempt,
4778
giveup=should_give_up,
4879
max_tries=max_tries,
49-
factor=factor,
50-
**kwargs,
80+
**backoff_params,
5181
)

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+53-16
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
import uuid
1212
from abc import ABC
1313
from contextlib import closing
14-
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
14+
from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
1515

16+
import backoff
1617
import pandas as pd
1718
import pendulum
1819
import requests # type: ignore[import]
@@ -31,14 +32,15 @@
3132
from .api import PARENT_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
3233
from .availability_strategy import SalesforceAvailabilityStrategy
3334
from .exceptions import SalesforceException, TmpFileIOError
34-
from .rate_limiting import default_backoff_handler
35+
from .rate_limiting import TRANSIENT_EXCEPTIONS, default_backoff_handler
3536

3637
# https://stackoverflow.com/a/54517228
3738
CSV_FIELD_SIZE_LIMIT = int(ctypes.c_ulong(-1).value // 2)
3839
csv.field_size_limit(CSV_FIELD_SIZE_LIMIT)
3940

4041
DEFAULT_ENCODING = "utf-8"
4142
LOOKBACK_SECONDS = 600 # based on https://trailhead.salesforce.com/trailblazer-community/feed/0D54V00007T48TASAZ
43+
_JOB_TRANSIENT_ERRORS_MAX_RETRY = 1
4244

4345

4446
class SalesforceStream(HttpStream, ABC):
@@ -351,24 +353,38 @@ def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str:
351353

352354
transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization)
353355

354-
@default_backoff_handler(max_tries=5, factor=15)
356+
@default_backoff_handler(max_tries=5, backoff_method=backoff.expo, backoff_params={"factor": 15})
355357
def _send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
358+
"""
359+
This method should be used when you don't have to read data from the HTTP body. Else, you will have to retry when you actually read
360+
the response buffer (which is either by calling `json` or `iter_content`)
361+
"""
362+
return self._non_retryable_send_http_request(method, url, json, headers, stream)
363+
364+
def _non_retryable_send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
356365
headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header()
357366
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
358367
if response.status_code not in [200, 204]:
359368
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
360369
response.raise_for_status()
361370
return response
362371

372+
@default_backoff_handler(max_tries=5, backoff_method=backoff.expo, backoff_params={"factor": 15})
373+
def _create_stream_job(self, query: str, url: str) -> Optional[str]:
374+
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
375+
response = self._non_retryable_send_http_request("POST", url, json=json)
376+
job_id: str = response.json()["id"]
377+
return job_id
378+
363379
def create_stream_job(self, query: str, url: str) -> Optional[str]:
364380
"""
365381
docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.html
382+
383+
Note that we want to retry during connection issues as well. Those can occur when calling `.json()`. Even in the case of a
384+
connection error during a HTTPError, we will retry as else, we won't be able to take the right action.
366385
"""
367-
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
368386
try:
369-
response = self._send_http_request("POST", url, json=json)
370-
job_id: str = response.json()["id"]
371-
return job_id
387+
return self._create_stream_job(query, url)
372388
except exceptions.HTTPError as error:
373389
if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]:
374390
# A part of streams can't be used by BULK API. Every API version can have a custom list of
@@ -383,9 +399,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
383399
# updated query: "Select Name, (Select Subject,ActivityType from ActivityHistories) from Contact"
384400
# The second variant forces customisation for every case (ActivityHistory, ActivityHistories etc).
385401
# And the main problem is these subqueries doesn't support CSV response format.
386-
error_data = error.response.json()[0]
387-
error_code = error_data.get("errorCode")
388-
error_message = error_data.get("message", "")
402+
error_code, error_message = self._extract_error_code_and_message(error.response)
389403
if error_message == "Selecting compound data not supported in Bulk Query" or (
390404
error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message
391405
):
@@ -401,7 +415,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
401415
elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
402416
self.logger.error(
403417
f"Cannot receive data for stream '{self.name}' ,"
404-
f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'"
418+
f"sobject options: {self.sobject_options}, Error message: '{error_message}'"
405419
)
406420
elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"):
407421
self.logger.error(
@@ -437,9 +451,7 @@ def wait_for_job(self, url: str) -> str:
437451
try:
438452
job_info = self._send_http_request("GET", url=url).json()
439453
except exceptions.HTTPError as error:
440-
error_data = error.response.json()[0]
441-
error_code = error_data.get("errorCode")
442-
error_message = error_data.get("message", "")
454+
error_code, error_message = self._extract_error_code_and_message(error.response)
443455
if (
444456
"We can't complete the action because enabled transaction security policies took too long to complete." in error_message
445457
and error_code == "TXN_SECURITY_METERING_ERROR"
@@ -473,6 +485,19 @@ def wait_for_job(self, url: str) -> str:
473485
self.logger.warning(f"Not wait the {self.name} data for {self.DEFAULT_WAIT_TIMEOUT_SECONDS} seconds, data: {job_info}!!")
474486
return job_status
475487

488+
def _extract_error_code_and_message(self, response: requests.Response) -> tuple[Optional[str], str]:
489+
try:
490+
error_data = response.json()[0]
491+
return error_data.get("errorCode"), error_data.get("message", "")
492+
except exceptions.JSONDecodeError:
493+
self.logger.warning(f"The response for `{response.request.url}` is not a JSON but was `{response.content}`")
494+
except IndexError:
495+
self.logger.warning(
496+
f"The response for `{response.request.url}` was expected to be a list with at least one element but was `{response.content}`"
497+
)
498+
499+
return None, ""
500+
476501
def execute_job(self, query: str, url: str) -> Tuple[Optional[str], Optional[str]]:
477502
job_status = "Failed"
478503
for i in range(0, self.MAX_RETRY_NUMBER):
@@ -520,6 +545,7 @@ def get_response_encoding(self, headers) -> str:
520545

521546
return self.encoding
522547

548+
@default_backoff_handler(max_tries=5, backoff_method=backoff.constant, backoff_params={"interval": 5})
523549
def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dict]:
524550
"""
525551
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations.
@@ -529,7 +555,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dic
529555
"""
530556
# set filepath for binary data from response
531557
tmp_file = str(uuid.uuid4())
532-
with closing(self._send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open(
558+
with closing(self._non_retryable_send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open(
533559
tmp_file, "wb"
534560
) as data_file:
535561
response_headers = response.headers
@@ -615,6 +641,7 @@ def read_records(
615641
cursor_field: List[str] = None,
616642
stream_slice: Mapping[str, Any] = None,
617643
stream_state: Mapping[str, Any] = None,
644+
call_count: int = 0,
618645
) -> Iterable[Mapping[str, Any]]:
619646
stream_state = stream_state or {}
620647
next_page_token = None
@@ -643,7 +670,17 @@ def read_records(
643670
while True:
644671
req = PreparedRequest()
645672
req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator})
646-
tmp_file, response_encoding, response_headers = self.download_data(url=req.url)
673+
try:
674+
tmp_file, response_encoding, response_headers = self.download_data(url=req.url)
675+
except TRANSIENT_EXCEPTIONS as exception:
676+
if call_count >= _JOB_TRANSIENT_ERRORS_MAX_RETRY:
677+
self.logger.error(f"Downloading data failed even after {call_count} retries. Stopping retry and raising exception")
678+
raise exception
679+
self.logger.warning(f"Downloading data failed after {call_count} retries. Retrying the whole job...")
680+
call_count += 1
681+
yield from self.read_records(sync_mode, cursor_field, stream_slice, stream_state, call_count=call_count)
682+
return
683+
647684
for record in self.read_with_chunks(tmp_file, response_encoding):
648685
yield record
649686

airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py

+56-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from airbyte_cdk.test.entrypoint_wrapper import read
3030
from airbyte_cdk.utils import AirbyteTracedException
3131
from conftest import encoding_symbols_parameters, generate_stream
32-
from requests.exceptions import HTTPError
32+
from requests.exceptions import ChunkedEncodingError, HTTPError
3333
from source_salesforce.api import Salesforce
3434
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
3535
from source_salesforce.source import SourceSalesforce
@@ -38,12 +38,19 @@
3838
BulkIncrementalSalesforceStream,
3939
BulkSalesforceStream,
4040
BulkSalesforceSubStream,
41-
Describe,
4241
IncrementalRestSalesforceStream,
4342
RestSalesforceStream,
44-
SalesforceStream,
4543
)
4644

45+
_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"]
46+
_A_JSON_RESPONSE = {"id": "any id"}
47+
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = {"state": "JobComplete"}
48+
_A_PK = "a_pk"
49+
_A_STREAM_NAME = "a_stream_name"
50+
51+
_NUMBER_OF_DOWNLOAD_TRIES = 5
52+
_FIRST_CALL_FROM_JOB_CREATION = 1
53+
4754
_ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []})
4855
_ANY_CONFIG = {}
4956
_ANY_STATE = None
@@ -589,6 +596,52 @@ def test_csv_reader_dialect_unix():
589596
assert result == data
590597

591598

599+
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
600+
def test_given_retryable_error_when_download_data_then_retry(send_http_request_patch):
601+
send_http_request_patch.return_value.iter_content.side_effect = [HTTPError(), _A_CHUNKED_RESPONSE]
602+
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).download_data(url="any url")
603+
assert send_http_request_patch.call_count == 2
604+
605+
606+
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
607+
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch):
608+
sf_api = Mock()
609+
sf_api.generate_schema.return_value = {}
610+
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com"
611+
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE]
612+
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2
613+
send_http_request_patch.return_value.iter_content.side_effect = HTTPError()
614+
615+
with pytest.raises(Exception):
616+
list(BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=sf_api, pk=_A_PK).read_records(SyncMode.full_refresh))
617+
618+
assert send_http_request_patch.call_count == (len(job_creation_return_values) + _NUMBER_OF_DOWNLOAD_TRIES) * 2
619+
620+
621+
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
622+
def test_given_http_errors_when_create_stream_job_then_retry(send_http_request_patch):
623+
send_http_request_patch.return_value.json.side_effect = [HTTPError(), _A_JSON_RESPONSE]
624+
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url")
625+
assert send_http_request_patch.call_count == 2
626+
627+
628+
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
629+
def test_given_fail_with_http_errors_when_create_stream_job_then_handle_error(send_http_request_patch):
630+
mocked_response = Mock()
631+
mocked_response.status_code = 666
632+
send_http_request_patch.return_value.json.side_effect = HTTPError(response=mocked_response)
633+
634+
with pytest.raises(HTTPError):
635+
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url")
636+
637+
638+
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
639+
def test_given_retryable_error_that_are_not_http_errors_when_create_stream_job_then_retry(send_http_request_patch):
640+
send_http_request_patch.return_value.json.side_effect = [ChunkedEncodingError(), _A_JSON_RESPONSE]
641+
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url")
642+
assert send_http_request_patch.call_count == 2
643+
644+
592645
@pytest.mark.parametrize(
593646
"stream_names,catalog_stream_names,",
594647
(

airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import json
6+
import pathlib
67
from typing import List
78
from unittest.mock import Mock
89

@@ -22,14 +23,14 @@ def time_sleep_mock(mocker):
2223

2324
@pytest.fixture(scope="module")
2425
def bulk_catalog():
25-
with open("unit_tests/bulk_catalog.json") as f:
26+
with (pathlib.Path(__file__).parent / "bulk_catalog.json").open() as f:
2627
data = json.loads(f.read())
2728
return ConfiguredAirbyteCatalog.parse_obj(data)
2829

2930

3031
@pytest.fixture(scope="module")
3132
def rest_catalog():
32-
with open("unit_tests/rest_catalog.json") as f:
33+
with (pathlib.Path(__file__).parent / "rest_catalog.json").open() as f:
3334
data = json.loads(f.read())
3435
return ConfiguredAirbyteCatalog.parse_obj(data)
3536

0 commit comments

Comments
 (0)