Skip to content

✨ Source Salesforce: Adding bulk stream mock server tests #37749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.8
dockerImageTag: 2.5.9
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.8"
version = "2.5.9"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,6 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap
assert not isinstance(stream, BulkSalesforceStream)


def test_bulk_sync_pagination(stream_config, stream_api, requests_mock):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to test_given_locator_when_read_then_extract_records_from_both_pages

stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)]
result_uri = requests_mock.register_uri(
"GET",
stream.path() + f"/{job_id}/results",
[
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}},
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}},
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}},
],
)
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}")

stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)]
assert loaded_ids == [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
assert result_uri.call_count == 3
assert result_uri.request_history[1].query == "locator=somelocator_1"
assert result_uri.request_history[2].query == "locator=somelocator_2"


def _prepare_mock(m, stream):
job_id = "fake_job_1"
m.register_uri("POST", stream.path(), json={"id": job_id})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

import freezegun
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_protocol.models import SyncMode
from airbyte_protocol.models import AirbyteStreamStatus, FailureType, SyncMode
from config_builder import ConfigBuilder
from integration.test_rest_stream import create_http_request as create_standard_http_request
from integration.test_rest_stream import create_http_response as create_standard_http_response
from integration.utils import create_base_url, given_authentication, given_stream, read
from salesforce_describe_response_builder import SalesforceDescribeResponseBuilder
from salesforce_job_response_builder import SalesforceJobResponseBuilder
Expand All @@ -20,11 +22,15 @@
_CLIENT_ID = "a_client_id"
_CLIENT_SECRET = "a_client_secret"
_CURSOR_FIELD = "SystemModstamp"
_INCREMENTAL_FIELDS = [_A_FIELD_NAME, _CURSOR_FIELD]
_INCREMENTAL_SCHEMA_BUILDER = SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME).field(_CURSOR_FIELD, "datetime") # re-using same fields as _INCREMENTAL_FIELDS
_INSTANCE_URL = "https://instance.salesforce.com"
_JOB_ID = "a-job-id"
_LOOKBACK_WINDOW = timedelta(seconds=LOOKBACK_SECONDS)
_NOW = datetime.now(timezone.utc)
_REFRESH_TOKEN = "a_refresh_token"
_RETRYABLE_RESPONSE = HttpResponse("{}", 420) # TODO: document what the body actually is on 420 errors
_SECOND_PAGE_LOCATOR = "second-page-locator"
_STREAM_NAME = "a_stream_name"

_BASE_URL = create_base_url(_INSTANCE_URL)
Expand Down Expand Up @@ -65,9 +71,60 @@ def tearDown(self) -> None:
def test_when_read_then_create_job_and_extract_records_from_result(self) -> None:
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
HttpRequest(f"{_BASE_URL}/jobs/query", body=json.dumps({"operation": "queryAll", "query": "SELECT a_field FROM a_stream_name", "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"})),
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
[
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("InProgress").build(),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("UploadComplete").build(),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
],
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"),
)
self._mock_delete_job(_JOB_ID)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert len(output.records) == 1

def test_given_locator_when_read_then_extract_records_from_both_pages(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
HttpResponse(f"{_A_FIELD_NAME}\nfield_value", headers={"Sforce-Locator": _SECOND_PAGE_LOCATOR}),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results", query_params={"locator": _SECOND_PAGE_LOCATOR}),
HttpResponse(f"{_A_FIELD_NAME}\nanother_field_value"),
)
self._mock_delete_job(_JOB_ID)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert len(output.records) == 2

def test_given_job_creation_have_transient_error_when_read_then_sync_properly(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
[
_RETRYABLE_RESPONSE,
HttpResponse(json.dumps({"id": _JOB_ID})),
],
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
Expand All @@ -76,18 +133,163 @@ def test_when_read_then_create_job_and_extract_records_from_result(self) -> None
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"),
)
self._mock_delete_job(_JOB_ID)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert len(output.errors) == 0
assert len(output.records) == 1

def test_given_bulk_restrictions_when_read_then_switch_to_standard(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
[
HttpResponse("[{}]", 403),
HttpResponse(json.dumps({"id": _JOB_ID})),
],
)
self._http_mocker.get(
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]),
create_standard_http_response([_A_FIELD_NAME]),
)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert len(output.records) == 1

def test_given_non_transient_error_on_job_creation_when_read_then_fail_sync(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps([{"errorCode": "API_ERROR", "message": "Implementation restriction... <can't complete the error message as I can't reproduce this issue>"}]), 400),
)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.INCOMPLETE

def test_given_job_is_aborted_when_read_then_fail_sync(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("Aborted").build(),
)
self._mock_delete_job(_JOB_ID)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.INCOMPLETE

def test_given_job_is_failed_when_read_then_switch_to_standard(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("Failed").build(),
)
self._http_mocker.get(
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]),
create_standard_http_response([_A_FIELD_NAME]),
)
self._mock_delete_job(_JOB_ID)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert len(output.records) == 1

def test_given_retryable_error_on_download_job_result_when_read_then_extract_records(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
[
_RETRYABLE_RESPONSE,
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"),
],
)
self._mock_delete_job(_JOB_ID)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert len(output.records) == 1

def test_given_retryable_error_on_delete_job_result_when_read_then_do_not_break(self):
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"),
)
self._http_mocker._mock_request_method( # FIXME to add DELETE method in airbyte_cdk tests
"delete",
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
[
_RETRYABLE_RESPONSE,
HttpResponse(""),
],
)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.COMPLETE

def test_given_non_retryable_error_on_delete_job_result_when_read_then_fail_to_sync(self):
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we've seen errors for that in prod but it would be sad kill the stream because of that

This is interesting: right now, we retry with the same policies has the other requests but it seems fair to just be a best effort,
catch everything and not retry
"""
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME))
self._http_mocker.post(
self._make_full_job_request([_A_FIELD_NAME]),
HttpResponse(json.dumps({"id": _JOB_ID})),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(),
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"),
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"),
)
self._http_mocker._mock_request_method( # FIXME to add DELETE method in airbyte_cdk tests
"delete",
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"),
HttpResponse("", 429),
)

output = read(_STREAM_NAME, SyncMode.full_refresh, self._config)

assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.INCOMPLETE

def test_given_incremental_when_read_then_create_job_and_extract_records_from_result(self) -> None:
start_date = (_NOW - timedelta(days=10)).replace(microsecond=0)
first_upper_boundary = start_date + timedelta(days=7)
self._config.start_date(start_date).stream_slice_step("P7D")
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME).field(_CURSOR_FIELD, "datetime"))
self._create_sliced_job(start_date, first_upper_boundary, "first_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=2))
self._create_sliced_job(first_upper_boundary, _NOW, "second_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=1))
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, _INCREMENTAL_SCHEMA_BUILDER)
self._create_sliced_job(start_date, first_upper_boundary, _INCREMENTAL_FIELDS, "first_slice_job_id", record_count=2)
self._create_sliced_job(first_upper_boundary, _NOW, _INCREMENTAL_FIELDS, "second_slice_job_id", record_count=1)

output = read(_STREAM_NAME, SyncMode.incremental, self._config)

Expand All @@ -99,22 +301,22 @@ def test_given_slice_fails_when_read_then_state_is_partitioned(self) -> None:
first_upper_boundary = start_date + slice_range
second_upper_boundary = first_upper_boundary + slice_range
self._config.start_date(start_date).stream_slice_step("P7D")
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME).field(_CURSOR_FIELD, "datetime"))
self._create_sliced_job(start_date, first_upper_boundary, "first_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=2))
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, _INCREMENTAL_SCHEMA_BUILDER)
self._create_sliced_job(start_date, first_upper_boundary, _INCREMENTAL_FIELDS, "first_slice_job_id", record_count=2)
self._http_mocker.post(
self._create_job_creation_request(first_upper_boundary, second_upper_boundary),
self._make_sliced_job_request(first_upper_boundary, second_upper_boundary, _INCREMENTAL_FIELDS),
HttpResponse("", status_code=400),
)
self._create_sliced_job(second_upper_boundary, _NOW, "third_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=1))
self._create_sliced_job(second_upper_boundary, _NOW, _INCREMENTAL_FIELDS, "third_slice_job_id", record_count=1)

output = read(_STREAM_NAME, SyncMode.incremental, self._config)

assert len(output.records) == 3
assert len(output.most_recent_state.stream_state.dict()["slices"]) == 2

def _create_sliced_job(self, start_date: datetime, first_upper_boundary: datetime, job_id: str, job_result: str) -> None:
def _create_sliced_job(self, lower_boundary: datetime, upper_boundary: datetime, fields: List[str], job_id: str, record_count: int) -> None:
self._http_mocker.post(
self._create_job_creation_request(start_date, first_upper_boundary),
self._make_sliced_job_request(lower_boundary, upper_boundary, fields),
HttpResponse(json.dumps({"id": job_id})),
)
self._http_mocker.get(
Expand All @@ -123,24 +325,37 @@ def _create_sliced_job(self, start_date: datetime, first_upper_boundary: datetim
)
self._http_mocker.get(
HttpRequest(f"{_BASE_URL}/jobs/query/{job_id}/results"),
HttpResponse(job_result),
HttpResponse(self._generate_csv(fields, count=record_count)),
)
self._mock_delete_job(job_id)

def _mock_delete_job(self, job_id: str) -> None:
self._http_mocker._mock_request_method( # FIXME to add DELETE method in airbyte_cdk tests
"delete",
HttpRequest(f"{_BASE_URL}/jobs/query/{job_id}"),
HttpResponse(job_result),
HttpResponse(""),
)

def _create_job_creation_request(self, start_date: datetime, first_upper_boundary: datetime) -> HttpRequest:
def _make_sliced_job_request(self, lower_boundary: datetime, upper_boundary: datetime, fields: List[str]) -> HttpRequest:
return self._build_job_creation_request(f"SELECT {', '.join(fields)} FROM a_stream_name WHERE SystemModstamp >= {lower_boundary.isoformat(timespec='milliseconds')} AND SystemModstamp < {upper_boundary.isoformat(timespec='milliseconds')}")

def _make_full_job_request(self, fields: List[str]) -> HttpRequest:
return self._build_job_creation_request(f"SELECT {', '.join(fields)} FROM a_stream_name")

def _generate_csv(self, fields: List[str], count: int = 1) -> str:
"""
This method does not handle field types for now which may cause some test failures on change if we start considering using some
fields for calculation. One example of that would be cursor field parsing to datetime.
"""
record = ','.join([f"{field}_value" for field in fields])
records = '\n'.join([record for _ in range(count)])
return f"{','.join(fields)}\n{records}"

def _build_job_creation_request(self, query: str) -> HttpRequest:
return HttpRequest(f"{_BASE_URL}/jobs/query", body=json.dumps({
"operation": "queryAll",
"query": f"SELECT a_field, SystemModstamp FROM a_stream_name WHERE SystemModstamp >= {start_date.isoformat(timespec='milliseconds')} AND SystemModstamp < {first_upper_boundary.isoformat(timespec='milliseconds')}",
"query": query,
"contentType": "CSV",
"columnDelimiter": "COMMA",
"lineEnding": "LF"
}))

def _generate_csv(self, fields: List[str], count: int = 1) -> str:
record = ','.join([f"{field}_value" for field in fields])
records = '\n'.join([record for _ in range(count)])
return f"{','.join(fields)}\n{records}"
Loading
Loading