-
Notifications
You must be signed in to change notification settings - Fork 4.5k
🐛 Source Salesforce: Add retry on REST API #36885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
maxi297
merged 3 commits into
master
from
maxi297/salesforce_retry_on_fetch_next_page_for_chunk
Apr 8, 2024
Merged
Changes from 2 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
39 changes: 39 additions & 0 deletions
39
airbyte-integrations/connectors/source-salesforce/unit_tests/config_builder.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
|
||
from datetime import datetime | ||
from typing import Any, Mapping | ||
|
||
|
||
class ConfigBuilder: | ||
def __init__(self) -> None: | ||
self._config = { | ||
"client_id": "fake_client_id", | ||
"client_secret": "fake_client_secret", | ||
"refresh_token": "fake_refresh_token", | ||
"start_date": "2010-01-18T21:18:20Z", | ||
"is_sandbox": False, | ||
"wait_timeout": 15, | ||
} | ||
|
||
def start_date(self, start_date: datetime) -> "ConfigBuilder": | ||
self._config["start_date"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ") | ||
return self | ||
|
||
def stream_slice_step(self, stream_slice_step: str) -> "ConfigBuilder": | ||
self._config["stream_slice_step"] = stream_slice_step | ||
return self | ||
|
||
def client_id(self, client_id: str) -> "ConfigBuilder": | ||
self._config["client_id"] = client_id | ||
return self | ||
|
||
def client_secret(self, client_secret: str) -> "ConfigBuilder": | ||
self._config["client_secret"] = client_secret | ||
return self | ||
|
||
def refresh_token(self, refresh_token: str) -> "ConfigBuilder": | ||
self._config["refresh_token"] = refresh_token | ||
return self | ||
|
||
def build(self) -> Mapping[str, Any]: | ||
return self._config |
Empty file.
92 changes: 92 additions & 0 deletions
92
...ations/connectors/source-salesforce/unit_tests/integration/test_rest_salesforce_stream.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
|
||
import json | ||
from datetime import datetime, timezone | ||
from typing import Any, Dict, Optional | ||
from unittest import TestCase | ||
|
||
import freezegun | ||
from airbyte_cdk.sources.source import TState | ||
from airbyte_cdk.test.catalog_builder import CatalogBuilder | ||
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read | ||
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse | ||
from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS | ||
from airbyte_cdk.test.state_builder import StateBuilder | ||
from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode | ||
from config_builder import ConfigBuilder | ||
from source_salesforce import SourceSalesforce | ||
from source_salesforce.api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS | ||
|
||
_A_FIELD_NAME = "a_field" | ||
_ACCESS_TOKEN = "an_access_token" | ||
_API_VERSION = "v57.0" | ||
_CLIENT_ID = "a_client_id" | ||
_CLIENT_SECRET = "a_client_secret" | ||
_INSTANCE_URL = "https://instance.salesforce.com" | ||
_NOW = datetime.now(timezone.utc) | ||
_REFRESH_TOKEN = "a_refresh_token" | ||
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0] | ||
|
||
|
||
def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: | ||
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() | ||
|
||
|
||
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceSalesforce: | ||
return SourceSalesforce(catalog, config, state) | ||
|
||
|
||
def _read( | ||
sync_mode: SyncMode, | ||
config_builder: Optional[ConfigBuilder] = None, | ||
expecting_exception: bool = False | ||
) -> EntrypointOutput: | ||
catalog = _catalog(sync_mode) | ||
config = config_builder.build() if config_builder else ConfigBuilder().build() | ||
state = StateBuilder().build() | ||
return read(_source(catalog, config, state), config, catalog, state, expecting_exception) | ||
|
||
|
||
def _given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str) -> None: | ||
http_mocker.post( | ||
HttpRequest( | ||
"https://login.salesforce.com/services/oauth2/token", | ||
query_params=ANY_QUERY_PARAMS, | ||
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}" | ||
), | ||
HttpResponse(json.dumps({"access_token": _ACCESS_TOKEN, "instance_url": _INSTANCE_URL})), | ||
) | ||
|
||
|
||
def _given_stream(http_mocker: HttpMocker, stream_name: str, field_name: str) -> None: | ||
http_mocker.get( | ||
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects"), | ||
HttpResponse(json.dumps({"sobjects": [{"name": stream_name, "queryable": True}]})), | ||
) | ||
http_mocker.get( | ||
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects/AcceptedEventRelation/describe"), | ||
HttpResponse(json.dumps({"fields": [{"name": field_name, "type": "string"}]})), | ||
) | ||
|
||
|
||
@freezegun.freeze_time(_NOW.isoformat()) | ||
class FullRefreshTest(TestCase): | ||
|
||
def setUp(self) -> None: | ||
self._config = ConfigBuilder().client_id(_CLIENT_ID).client_secret(_CLIENT_SECRET).refresh_token(_REFRESH_TOKEN) | ||
|
||
@HttpMocker() | ||
def test_given_error_on_fetch_chunk_when_read_then_retry(self, http_mocker: HttpMocker) -> None: | ||
_given_authentication(http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN) | ||
_given_stream(http_mocker, _STREAM_NAME, _A_FIELD_NAME) | ||
http_mocker.get( | ||
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/queryAll?q=SELECT+{_A_FIELD_NAME}+FROM+{_STREAM_NAME}+"), | ||
[ | ||
HttpResponse("", status_code=406), | ||
HttpResponse(json.dumps({"records": [{"a_field": "a_value"}]})), | ||
] | ||
) | ||
|
||
output = _read(SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.records) == 1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will clash with the source-salesforce part of https://github.com/airbytehq/airbyte/pull/36811/files#diff-82cd2ba6c00f795c829bdc86d1f039933a6168b6c1c6c1a4b89226c7b5c6bf67 but that shouldn't be too much of a hassle