Skip to content

✨ Source Salesforce: Adding bulk stream mock server tests #37749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.8
dockerImageTag: 2.5.9
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.8"
version = "2.5.9"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airbyte_cdk.utils import AirbyteTracedException
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import ChunkedEncodingError, HTTPError
from salesforce_job_response_builder import SalesforceJobResponseBuilder
from salesforce_job_response_builder import JobInfoResponseBuilder
from source_salesforce.api import Salesforce
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
from source_salesforce.source import SourceSalesforce
Expand All @@ -47,7 +47,7 @@

_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"]
_A_JSON_RESPONSE = {"id": "any id"}
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = JobInfoResponseBuilder().with_state("JobComplete").get_response()
_A_PK = "a_pk"
_A_STREAM_NAME = "a_stream_name"

Expand Down Expand Up @@ -179,31 +179,6 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap
assert not isinstance(stream, BulkSalesforceStream)


def test_bulk_sync_pagination(stream_config, stream_api, requests_mock):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to test_given_locator_when_read_then_extract_records_from_both_pages

stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)]
result_uri = requests_mock.register_uri(
"GET",
stream.path() + f"/{job_id}/results",
[
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}},
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}},
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}},
],
)
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}")

stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)]
assert loaded_ids == [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
assert result_uri.call_count == 3
assert result_uri.request_history[1].query == "locator=somelocator_1"
assert result_uri.request_history[2].query == "locator=somelocator_2"


def _prepare_mock(m, stream):
job_id = "fake_job_1"
m.register_uri("POST", stream.path(), json={"id": job_id})
Expand Down Expand Up @@ -481,7 +456,7 @@ def test_given_retryable_error_when_download_data_then_retry(send_http_request_p
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch):
sf_api = Mock()
sf_api.generate_schema.return_value = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
sf_api.generate_schema.return_value = JobInfoResponseBuilder().with_state("JobComplete").get_response()
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com"
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE]
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2
Expand Down Expand Up @@ -869,13 +844,13 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api, state=state, legacy=True)

job_id_1 = "fake_job_1"
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": JobInfoResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}")
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-01-15,1")
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}")

job_id_2 = "fake_job_2"
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": JobInfoResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}")
requests_mock.register_uri(
"GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2\ntest,2023-02-20,22"
Expand All @@ -886,7 +861,7 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
queries_history = requests_mock.register_uri(
"POST", stream.path(), [{"json": {"id": job_id_1}}, {"json": {"id": job_id_2}}, {"json": {"id": job_id_3}}]
)
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": JobInfoResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}")
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3")
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}")
Expand Down Expand Up @@ -945,7 +920,7 @@ def test_stream_slices_for_substream(stream_config, stream_api, requests_mock):

job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=JobInfoResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
requests_mock.register_uri(
"GET",
stream.path() + f"/{job_id}/results",
Expand Down
Loading
Loading