Skip to content

🐛 Source Salesforce: Add retry on REST API #36885

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.4.2
dockerImageTag: 2.4.3
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def _read_pages(
# Always return an empty generator just in case no records were ever yielded
yield from []

@default_backoff_handler(max_tries=5, backoff_method=backoff.constant, backoff_params={"interval": 5})
def _fetch_next_page_for_chunk(
self,
stream_slice: Mapping[str, Any] = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from datetime import datetime
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will clash with the source-salesforce part of https://github.com/airbytehq/airbyte/pull/36811/files#diff-82cd2ba6c00f795c829bdc86d1f039933a6168b6c1c6c1a4b89226c7b5c6bf67 but that shouldn't be too much of a hassle

from typing import Any, Mapping


class ConfigBuilder:
def __init__(self) -> None:
self._config = {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"start_date": "2010-01-18T21:18:20Z",
"is_sandbox": False,
"wait_timeout": 15,
}

def start_date(self, start_date: datetime) -> "ConfigBuilder":
self._config["start_date"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
return self

def stream_slice_step(self, stream_slice_step: str) -> "ConfigBuilder":
self._config["stream_slice_step"] = stream_slice_step
return self

def client_id(self, client_id: str) -> "ConfigBuilder":
self._config["client_id"] = client_id
return self

def client_secret(self, client_secret: str) -> "ConfigBuilder":
self._config["client_secret"] = client_secret
return self

def refresh_token(self, refresh_token: str) -> "ConfigBuilder":
self._config["refresh_token"] = refresh_token
return self

def build(self) -> Mapping[str, Any]:
return self._config
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode
from config_builder import ConfigBuilder
from source_salesforce import SourceSalesforce
from source_salesforce.api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS

_A_FIELD_NAME = "a_field"
_ACCESS_TOKEN = "an_access_token"
_API_VERSION = "v57.0"
_CLIENT_ID = "a_client_id"
_CLIENT_SECRET = "a_client_secret"
_INSTANCE_URL = "https://instance.salesforce.com"
_NOW = datetime.now(timezone.utc)
_REFRESH_TOKEN = "a_refresh_token"
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0]


def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceSalesforce:
return SourceSalesforce(catalog, config, state)


def _read(
sync_mode: SyncMode,
config_builder: Optional[ConfigBuilder] = None,
expecting_exception: bool = False
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build() if config_builder else ConfigBuilder().build()
state = StateBuilder().build()
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def _given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str) -> None:
http_mocker.post(
HttpRequest(
"https://login.salesforce.com/services/oauth2/token",
query_params=ANY_QUERY_PARAMS,
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
),
HttpResponse(json.dumps({"access_token": _ACCESS_TOKEN, "instance_url": _INSTANCE_URL})),
)


def _given_stream(http_mocker: HttpMocker, stream_name: str, field_name: str) -> None:
http_mocker.get(
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects"),
HttpResponse(json.dumps({"sobjects": [{"name": stream_name, "queryable": True}]})),
)
http_mocker.get(
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects/AcceptedEventRelation/describe"),
HttpResponse(json.dumps({"fields": [{"name": field_name, "type": "string"}]})),
)


@freezegun.freeze_time(_NOW.isoformat())
class FullRefreshTest(TestCase):

def setUp(self) -> None:
self._config = ConfigBuilder().client_id(_CLIENT_ID).client_secret(_CLIENT_SECRET).refresh_token(_REFRESH_TOKEN)

@HttpMocker()
def test_given_error_on_fetch_chunk_when_read_then_retry(self, http_mocker: HttpMocker) -> None:
_given_authentication(http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN)
_given_stream(http_mocker, _STREAM_NAME, _A_FIELD_NAME)
http_mocker.get(
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/queryAll?q=SELECT+{_A_FIELD_NAME}+FROM+{_STREAM_NAME}+"),
[
HttpResponse("", status_code=406),
HttpResponse(json.dumps({"records": [{"a_field": "a_value"}]})),
]
)

output = _read(SyncMode.full_refresh, self._config)

assert len(output.records) == 1
3 changes: 2 additions & 1 deletion docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams |
| 2.4.3 | 2024-04-08 | [36885](https://github.com/airbytehq/airbyte/pull/36885) | Add missing retry on REST API |
| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams |
| 2.4.1 | 2024-04-03 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests and jobs on various cases |
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state |
| 2.3.3 | 2024-03-04 | [35791](https://github.com/airbytehq/airbyte/pull/35791) | Fix memory leak (OOM) |
Expand Down
Loading