-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Salesforce: retry on download_data and create_stream_job #36385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
142173b
Salesforce: retry on IncompleteRead
maxi297 ff26052
Update version
maxi297 5bf5e57
format
maxi297 7c4baa5
Fix error handling and add retries on download_data and create_stream…
maxi297 e50fe98
format
maxi297 cf04b83
Retry whole job once
maxi297 3499484
Retry on 406 and 420 as well
maxi297 f23353f
format
maxi297 734b486
Handle non-json responses on errors
maxi297 e89a9c1
Improve @default_backoff_handler interface
maxi297 3f32207
Format and update changelogs
maxi297 bf0935b
Update changelog date
maxi297 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] | |
build-backend = "poetry.core.masonry.api" | ||
|
||
[tool.poetry] | ||
version = "2.4.0" | ||
version = "2.4.1" | ||
name = "source-salesforce" | ||
description = "Source implementation for Salesforce." | ||
authors = [ "Airbyte <[email protected]>",] | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ | |
from airbyte_cdk.test.entrypoint_wrapper import read | ||
from airbyte_cdk.utils import AirbyteTracedException | ||
from conftest import encoding_symbols_parameters, generate_stream | ||
from requests.exceptions import HTTPError | ||
from requests.exceptions import ChunkedEncodingError, HTTPError | ||
from source_salesforce.api import Salesforce | ||
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING | ||
from source_salesforce.source import SourceSalesforce | ||
|
@@ -38,12 +38,19 @@ | |
BulkIncrementalSalesforceStream, | ||
BulkSalesforceStream, | ||
BulkSalesforceSubStream, | ||
Describe, | ||
IncrementalRestSalesforceStream, | ||
RestSalesforceStream, | ||
SalesforceStream, | ||
) | ||
|
||
_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"] | ||
_A_JSON_RESPONSE = {"id": "any id"} | ||
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = {"state": "JobComplete"} | ||
_A_PK = "a_pk" | ||
_A_STREAM_NAME = "a_stream_name" | ||
|
||
_NUMBER_OF_DOWNLOAD_TRIES = 5 | ||
_FIRST_CALL_FROM_JOB_CREATION = 1 | ||
|
||
_ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []}) | ||
_ANY_CONFIG = {} | ||
_ANY_STATE = None | ||
|
@@ -589,6 +596,52 @@ def test_csv_reader_dialect_unix(): | |
assert result == data | ||
|
||
|
||
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request") | ||
def test_given_retryable_error_when_download_data_then_retry(send_http_request_patch): | ||
send_http_request_patch.return_value.iter_content.side_effect = [HTTPError(), _A_CHUNKED_RESPONSE] | ||
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).download_data(url="any url") | ||
assert send_http_request_patch.call_count == 2 | ||
|
||
|
||
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request") | ||
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch): | ||
sf_api = Mock() | ||
sf_api.generate_schema.return_value = {} | ||
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com" | ||
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE] | ||
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2 | ||
send_http_request_patch.return_value.iter_content.side_effect = HTTPError() | ||
|
||
with pytest.raises(Exception): | ||
list(BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=sf_api, pk=_A_PK).read_records(SyncMode.full_refresh)) | ||
|
||
assert send_http_request_patch.call_count == (len(job_creation_return_values) + _NUMBER_OF_DOWNLOAD_TRIES) * 2 | ||
|
||
|
||
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request") | ||
def test_given_http_errors_when_create_stream_job_then_retry(send_http_request_patch): | ||
send_http_request_patch.return_value.json.side_effect = [HTTPError(), _A_JSON_RESPONSE] | ||
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url") | ||
assert send_http_request_patch.call_count == 2 | ||
|
||
|
||
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request") | ||
def test_given_fail_with_http_errors_when_create_stream_job_then_handle_error(send_http_request_patch): | ||
mocked_response = Mock() | ||
mocked_response.status_code = 666 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👿 |
||
send_http_request_patch.return_value.json.side_effect = HTTPError(response=mocked_response) | ||
|
||
with pytest.raises(HTTPError): | ||
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url") | ||
|
||
|
||
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request") | ||
def test_given_retryable_error_that_are_not_http_errors_when_create_stream_job_then_retry(send_http_request_patch): | ||
send_http_request_patch.return_value.json.side_effect = [ChunkedEncodingError(), _A_JSON_RESPONSE] | ||
BulkSalesforceStream(stream_name=_A_STREAM_NAME, sf_api=Mock(), pk=_A_PK).create_stream_job(query="any query", url="any url") | ||
assert send_http_request_patch.call_count == 2 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"stream_names,catalog_stream_names,", | ||
( | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin | |
|
||
| Version | Date | Pull Request | Subject | | ||
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| | ||
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state | | ||
| 2.4.1 | 2024-03-22 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests on IncompleteRead | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be updated... |
||
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state | | ||
| 2.3.3 | 2024-03-04 | [35791](https://github.com/airbytehq/airbyte/pull/35791) | Fix memory leak (OOM) | | ||
| 2.3.2 | 2024-02-19 | [35421](https://github.com/airbytehq/airbyte/pull/35421) | Add Stream Slice Step option to specification | | ||
| 2.3.1 | 2024-02-12 | [35147](https://github.com/airbytehq/airbyte/pull/35147) | Manage dependencies with Poetry. | | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like individual parameters would be better. Let's see if we can do this even if
method
can take different parameters