Skip to content

Commit 5c6024b

Browse files
authored
✨ Source Salesforce: Adding bulk stream mock server tests (#37749)
1 parent d3864c2 commit 5c6024b

File tree

7 files changed

+307
-73
lines changed

7 files changed

+307
-73
lines changed

airbyte-integrations/connectors/source-salesforce/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.5.8
13+
dockerImageTag: 2.5.9
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.5.8"
6+
version = "2.5.9"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py

+7-32
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from airbyte_cdk.utils import AirbyteTracedException
3333
from conftest import encoding_symbols_parameters, generate_stream
3434
from requests.exceptions import ChunkedEncodingError, HTTPError
35-
from salesforce_job_response_builder import SalesforceJobResponseBuilder
35+
from salesforce_job_response_builder import JobInfoResponseBuilder
3636
from source_salesforce.api import Salesforce
3737
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
3838
from source_salesforce.source import SourceSalesforce
@@ -47,7 +47,7 @@
4747

4848
_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"]
4949
_A_JSON_RESPONSE = {"id": "any id"}
50-
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
50+
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = JobInfoResponseBuilder().with_state("JobComplete").get_response()
5151
_A_PK = "a_pk"
5252
_A_STREAM_NAME = "a_stream_name"
5353

@@ -179,31 +179,6 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap
179179
assert not isinstance(stream, BulkSalesforceStream)
180180

181181

182-
def test_bulk_sync_pagination(stream_config, stream_api, requests_mock):
183-
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
184-
job_id = "fake_job"
185-
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
186-
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
187-
resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)]
188-
result_uri = requests_mock.register_uri(
189-
"GET",
190-
stream.path() + f"/{job_id}/results",
191-
[
192-
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}},
193-
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}},
194-
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}},
195-
],
196-
)
197-
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}")
198-
199-
stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
200-
loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)]
201-
assert loaded_ids == [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
202-
assert result_uri.call_count == 3
203-
assert result_uri.request_history[1].query == "locator=somelocator_1"
204-
assert result_uri.request_history[2].query == "locator=somelocator_2"
205-
206-
207182
def _prepare_mock(m, stream):
208183
job_id = "fake_job_1"
209184
m.register_uri("POST", stream.path(), json={"id": job_id})
@@ -481,7 +456,7 @@ def test_given_retryable_error_when_download_data_then_retry(send_http_request_p
481456
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
482457
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch):
483458
sf_api = Mock()
484-
sf_api.generate_schema.return_value = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
459+
sf_api.generate_schema.return_value = JobInfoResponseBuilder().with_state("JobComplete").get_response()
485460
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com"
486461
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE]
487462
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2
@@ -869,13 +844,13 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
869844
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api, state=state, legacy=True)
870845

871846
job_id_1 = "fake_job_1"
872-
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
847+
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": JobInfoResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
873848
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}")
874849
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-01-15,1")
875850
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}")
876851

877852
job_id_2 = "fake_job_2"
878-
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
853+
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": JobInfoResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
879854
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}")
880855
requests_mock.register_uri(
881856
"GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2\ntest,2023-02-20,22"
@@ -886,7 +861,7 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
886861
queries_history = requests_mock.register_uri(
887862
"POST", stream.path(), [{"json": {"id": job_id_1}}, {"json": {"id": job_id_2}}, {"json": {"id": job_id_3}}]
888863
)
889-
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
864+
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": JobInfoResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
890865
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}")
891866
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3")
892867
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}")
@@ -945,7 +920,7 @@ def test_stream_slices_for_substream(stream_config, stream_api, requests_mock):
945920

946921
job_id = "fake_job"
947922
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
948-
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
923+
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=JobInfoResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
949924
requests_mock.register_uri(
950925
"GET",
951926
stream.path() + f"/{job_id}/results",

0 commit comments

Comments
 (0)