Skip to content

Commit 0f1eeb1

Browse files
authored
🐛 Source Salesforce: Add retry on REST API (#36885)
1 parent b753ade commit 0f1eeb1

File tree

7 files changed

+136
-3
lines changed

7 files changed

+136
-3
lines changed

airbyte-integrations/connectors/source-salesforce/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.4.2
13+
dockerImageTag: 2.4.3
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.4.2"
6+
version = "2.4.3"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+1
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ def _read_pages(
296296
# Always return an empty generator just in case no records were ever yielded
297297
yield from []
298298

299+
@default_backoff_handler(max_tries=5, backoff_method=backoff.constant, backoff_params={"interval": 5})
299300
def _fetch_next_page_for_chunk(
300301
self,
301302
stream_slice: Mapping[str, Any] = None,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from datetime import datetime
4+
from typing import Any, Mapping
5+
6+
7+
class ConfigBuilder:
8+
def __init__(self) -> None:
9+
self._config = {
10+
"client_id": "fake_client_id",
11+
"client_secret": "fake_client_secret",
12+
"refresh_token": "fake_refresh_token",
13+
"start_date": "2010-01-18T21:18:20Z",
14+
"is_sandbox": False,
15+
"wait_timeout": 15,
16+
}
17+
18+
def start_date(self, start_date: datetime) -> "ConfigBuilder":
19+
self._config["start_date"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
20+
return self
21+
22+
def stream_slice_step(self, stream_slice_step: str) -> "ConfigBuilder":
23+
self._config["stream_slice_step"] = stream_slice_step
24+
return self
25+
26+
def client_id(self, client_id: str) -> "ConfigBuilder":
27+
self._config["client_id"] = client_id
28+
return self
29+
30+
def client_secret(self, client_secret: str) -> "ConfigBuilder":
31+
self._config["client_secret"] = client_secret
32+
return self
33+
34+
def refresh_token(self, refresh_token: str) -> "ConfigBuilder":
35+
self._config["refresh_token"] = refresh_token
36+
return self
37+
38+
def build(self) -> Mapping[str, Any]:
39+
return self._config

airbyte-integrations/connectors/source-salesforce/unit_tests/integration/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
import json
4+
from datetime import datetime, timezone
5+
from typing import Any, Dict, Optional
6+
from unittest import TestCase
7+
8+
import freezegun
9+
from airbyte_cdk.sources.source import TState
10+
from airbyte_cdk.test.catalog_builder import CatalogBuilder
11+
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
12+
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
13+
from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS
14+
from airbyte_cdk.test.state_builder import StateBuilder
15+
from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode
16+
from config_builder import ConfigBuilder
17+
from source_salesforce import SourceSalesforce
18+
from source_salesforce.api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS
19+
20+
_A_FIELD_NAME = "a_field"
21+
_ACCESS_TOKEN = "an_access_token"
22+
_API_VERSION = "v57.0"
23+
_CLIENT_ID = "a_client_id"
24+
_CLIENT_SECRET = "a_client_secret"
25+
_INSTANCE_URL = "https://instance.salesforce.com"
26+
_NOW = datetime.now(timezone.utc)
27+
_REFRESH_TOKEN = "a_refresh_token"
28+
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0]
29+
30+
31+
def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
32+
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()
33+
34+
35+
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceSalesforce:
36+
return SourceSalesforce(catalog, config, state)
37+
38+
39+
def _read(
40+
sync_mode: SyncMode,
41+
config_builder: Optional[ConfigBuilder] = None,
42+
expecting_exception: bool = False
43+
) -> EntrypointOutput:
44+
catalog = _catalog(sync_mode)
45+
config = config_builder.build() if config_builder else ConfigBuilder().build()
46+
state = StateBuilder().build()
47+
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)
48+
49+
50+
def _given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str) -> None:
51+
http_mocker.post(
52+
HttpRequest(
53+
"https://login.salesforce.com/services/oauth2/token",
54+
query_params=ANY_QUERY_PARAMS,
55+
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
56+
),
57+
HttpResponse(json.dumps({"access_token": _ACCESS_TOKEN, "instance_url": _INSTANCE_URL})),
58+
)
59+
60+
61+
def _given_stream(http_mocker: HttpMocker, stream_name: str, field_name: str) -> None:
62+
http_mocker.get(
63+
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects"),
64+
HttpResponse(json.dumps({"sobjects": [{"name": stream_name, "queryable": True}]})),
65+
)
66+
http_mocker.get(
67+
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects/AcceptedEventRelation/describe"),
68+
HttpResponse(json.dumps({"fields": [{"name": field_name, "type": "string"}]})),
69+
)
70+
71+
72+
@freezegun.freeze_time(_NOW.isoformat())
73+
class FullRefreshTest(TestCase):
74+
75+
def setUp(self) -> None:
76+
self._config = ConfigBuilder().client_id(_CLIENT_ID).client_secret(_CLIENT_SECRET).refresh_token(_REFRESH_TOKEN)
77+
78+
@HttpMocker()
79+
def test_given_error_on_fetch_chunk_when_read_then_retry(self, http_mocker: HttpMocker) -> None:
80+
_given_authentication(http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN)
81+
_given_stream(http_mocker, _STREAM_NAME, _A_FIELD_NAME)
82+
http_mocker.get(
83+
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/queryAll?q=SELECT+{_A_FIELD_NAME}+FROM+{_STREAM_NAME}+"),
84+
[
85+
HttpResponse("", status_code=406),
86+
HttpResponse(json.dumps({"records": [{"a_field": "a_value"}]})),
87+
]
88+
)
89+
90+
output = _read(SyncMode.full_refresh, self._config)
91+
92+
assert len(output.records) == 1

docs/integrations/sources/salesforce.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin
193193

194194
| Version | Date | Pull Request | Subject |
195195
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
196-
| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams |
196+
| 2.4.3 | 2024-04-08 | [36885](https://github.com/airbytehq/airbyte/pull/36885) | Add missing retry on REST API |
197+
| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams |
197198
| 2.4.1 | 2024-04-03 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests and jobs on various cases |
198199
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state |
199200
| 2.3.3 | 2024-03-04 | [35791](https://github.com/airbytehq/airbyte/pull/35791) | Fix memory leak (OOM) |

0 commit comments

Comments
 (0)