-
Notifications
You must be signed in to change notification settings - Fork 4.5k
✨ Source Salesforce: Adding bulk stream mock server tests #37749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
maxi297
merged 6 commits into
master
from
maxi297/salesforce-add-mock-server-test-for-bulk
May 6, 2024
Merged
Changes from 3 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] | |
build-backend = "poetry.core.masonry.api" | ||
|
||
[tool.poetry] | ||
version = "2.5.8" | ||
version = "2.5.9" | ||
name = "source-salesforce" | ||
description = "Source implementation for Salesforce." | ||
authors = [ "Airbyte <[email protected]>",] | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,8 +8,10 @@ | |
|
||
import freezegun | ||
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse | ||
from airbyte_protocol.models import SyncMode | ||
from airbyte_protocol.models import AirbyteStreamStatus, FailureType, SyncMode | ||
from config_builder import ConfigBuilder | ||
from integration.test_rest_stream import create_http_request as create_standard_http_request | ||
from integration.test_rest_stream import create_http_response as create_standard_http_response | ||
from integration.utils import create_base_url, given_authentication, given_stream, read | ||
from salesforce_describe_response_builder import SalesforceDescribeResponseBuilder | ||
from salesforce_job_response_builder import SalesforceJobResponseBuilder | ||
|
@@ -20,11 +22,15 @@ | |
_CLIENT_ID = "a_client_id" | ||
_CLIENT_SECRET = "a_client_secret" | ||
_CURSOR_FIELD = "SystemModstamp" | ||
_INCREMENTAL_FIELDS = [_A_FIELD_NAME, _CURSOR_FIELD] | ||
_INCREMENTAL_SCHEMA_BUILDER = SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME).field(_CURSOR_FIELD, "datetime") # re-using same fields as _INCREMENTAL_FIELDS | ||
_INSTANCE_URL = "https://instance.salesforce.com" | ||
_JOB_ID = "a-job-id" | ||
_LOOKBACK_WINDOW = timedelta(seconds=LOOKBACK_SECONDS) | ||
_NOW = datetime.now(timezone.utc) | ||
_REFRESH_TOKEN = "a_refresh_token" | ||
_RETRYABLE_RESPONSE = HttpResponse("{}", 420) # TODO: document what the body actually is on 420 errors | ||
maxi297 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_SECOND_PAGE_LOCATOR = "second-page-locator" | ||
_STREAM_NAME = "a_stream_name" | ||
|
||
_BASE_URL = create_base_url(_INSTANCE_URL) | ||
|
@@ -65,9 +71,60 @@ def tearDown(self) -> None: | |
def test_when_read_then_create_job_and_extract_records_from_result(self) -> None: | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
HttpRequest(f"{_BASE_URL}/jobs/query", body=json.dumps({"operation": "queryAll", "query": "SELECT a_field FROM a_stream_name", "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"})), | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
[ | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("InProgress").build(), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("UploadComplete").build(), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(), | ||
], | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"), | ||
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"), | ||
) | ||
self._mock_delete_job(_JOB_ID) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.records) == 1 | ||
|
||
def test_given_locator_when_read_then_extract_records_from_both_pages(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
maxi297 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"), | ||
HttpResponse(f"{_A_FIELD_NAME}\nfield_value", headers={"Sforce-Locator": _SECOND_PAGE_LOCATOR}), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results", query_params={"locator": _SECOND_PAGE_LOCATOR}), | ||
HttpResponse(f"{_A_FIELD_NAME}\nanother_field_value"), | ||
) | ||
self._mock_delete_job(_JOB_ID) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.records) == 2 | ||
|
||
def test_given_job_creation_have_transient_error_when_read_then_sync_properly(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
[ | ||
_RETRYABLE_RESPONSE, | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
], | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(), | ||
|
@@ -76,18 +133,163 @@ def test_when_read_then_create_job_and_extract_records_from_result(self) -> None | |
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"), | ||
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"), | ||
) | ||
self._mock_delete_job(_JOB_ID) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.errors) == 0 | ||
assert len(output.records) == 1 | ||
|
||
def test_given_bulk_restrictions_when_read_then_switch_to_standard(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
[ | ||
HttpResponse("[{}]", 403), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
], | ||
) | ||
self._http_mocker.get( | ||
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]), | ||
create_standard_http_response([_A_FIELD_NAME]), | ||
) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.records) == 1 | ||
|
||
def test_given_non_transient_error_on_job_creation_when_read_then_fail_sync(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps([{"errorCode": "API_ERROR", "message": "Implementation restriction... <can't complete the error message as I can't reproduce this issue>"}]), 400), | ||
) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.INCOMPLETE | ||
|
||
def test_given_job_is_aborted_when_read_then_fail_sync(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("Aborted").build(), | ||
) | ||
self._mock_delete_job(_JOB_ID) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.INCOMPLETE | ||
|
||
def test_given_job_is_failed_when_read_then_switch_to_standard(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("Failed").build(), | ||
) | ||
self._http_mocker.get( | ||
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]), | ||
create_standard_http_response([_A_FIELD_NAME]), | ||
) | ||
self._mock_delete_job(_JOB_ID) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.records) == 1 | ||
|
||
def test_given_retryable_error_on_download_job_result_when_read_then_extract_records(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"), | ||
[ | ||
_RETRYABLE_RESPONSE, | ||
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"), | ||
], | ||
) | ||
self._mock_delete_job(_JOB_ID) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert len(output.records) == 1 | ||
|
||
def test_given_retryable_error_on_delete_job_result_when_read_then_do_not_break(self): | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"), | ||
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"), | ||
) | ||
self._http_mocker._mock_request_method( # FIXME to add DELETE method in airbyte_cdk tests | ||
"delete", | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
[ | ||
_RETRYABLE_RESPONSE, | ||
HttpResponse(""), | ||
], | ||
) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.COMPLETE | ||
|
||
def test_given_non_retryable_error_on_delete_job_result_when_read_then_fail_to_sync(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we've seen errors for that in prod but it would be sad kill the stream because of that |
||
This is interesting: right now, we retry with the same policies has the other requests but it seems fair to just be a best effort, | ||
catch everything and not retry | ||
""" | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME)) | ||
self._http_mocker.post( | ||
self._make_full_job_request([_A_FIELD_NAME]), | ||
HttpResponse(json.dumps({"id": _JOB_ID})), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
SalesforceJobResponseBuilder().with_id(_JOB_ID).with_state("JobComplete").build(), | ||
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}/results"), | ||
HttpResponse(f"{_A_FIELD_NAME}\nfield_value"), | ||
) | ||
self._http_mocker._mock_request_method( # FIXME to add DELETE method in airbyte_cdk tests | ||
"delete", | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{_JOB_ID}"), | ||
HttpResponse("", 429), | ||
) | ||
|
||
output = read(_STREAM_NAME, SyncMode.full_refresh, self._config) | ||
|
||
assert output.get_stream_statuses(_STREAM_NAME)[-1] == AirbyteStreamStatus.INCOMPLETE | ||
|
||
def test_given_incremental_when_read_then_create_job_and_extract_records_from_result(self) -> None: | ||
start_date = (_NOW - timedelta(days=10)).replace(microsecond=0) | ||
first_upper_boundary = start_date + timedelta(days=7) | ||
self._config.start_date(start_date).stream_slice_step("P7D") | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME).field(_CURSOR_FIELD, "datetime")) | ||
self._create_sliced_job(start_date, first_upper_boundary, "first_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=2)) | ||
self._create_sliced_job(first_upper_boundary, _NOW, "second_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=1)) | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, _INCREMENTAL_SCHEMA_BUILDER) | ||
aldogonzalez8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._create_sliced_job(start_date, first_upper_boundary, _INCREMENTAL_FIELDS, "first_slice_job_id", record_count=2) | ||
self._create_sliced_job(first_upper_boundary, _NOW, _INCREMENTAL_FIELDS, "second_slice_job_id", record_count=1) | ||
|
||
output = read(_STREAM_NAME, SyncMode.incremental, self._config) | ||
|
||
|
@@ -99,22 +301,22 @@ def test_given_slice_fails_when_read_then_state_is_partitioned(self) -> None: | |
first_upper_boundary = start_date + slice_range | ||
second_upper_boundary = first_upper_boundary + slice_range | ||
self._config.start_date(start_date).stream_slice_step("P7D") | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, SalesforceDescribeResponseBuilder().field(_A_FIELD_NAME).field(_CURSOR_FIELD, "datetime")) | ||
self._create_sliced_job(start_date, first_upper_boundary, "first_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=2)) | ||
given_stream(self._http_mocker, _BASE_URL, _STREAM_NAME, _INCREMENTAL_SCHEMA_BUILDER) | ||
self._create_sliced_job(start_date, first_upper_boundary, _INCREMENTAL_FIELDS, "first_slice_job_id", record_count=2) | ||
self._http_mocker.post( | ||
self._create_job_creation_request(first_upper_boundary, second_upper_boundary), | ||
self._make_sliced_job_request(first_upper_boundary, second_upper_boundary, _INCREMENTAL_FIELDS), | ||
HttpResponse("", status_code=400), | ||
) | ||
self._create_sliced_job(second_upper_boundary, _NOW, "third_slice_job_id", self._generate_csv([_A_FIELD_NAME, _CURSOR_FIELD], count=1)) | ||
self._create_sliced_job(second_upper_boundary, _NOW, _INCREMENTAL_FIELDS, "third_slice_job_id", record_count=1) | ||
|
||
output = read(_STREAM_NAME, SyncMode.incremental, self._config) | ||
|
||
assert len(output.records) == 3 | ||
assert len(output.most_recent_state.stream_state.dict()["slices"]) == 2 | ||
|
||
def _create_sliced_job(self, start_date: datetime, first_upper_boundary: datetime, job_id: str, job_result: str) -> None: | ||
def _create_sliced_job(self, lower_boundary: datetime, upper_boundary: datetime, fields: List[str], job_id: str, record_count: int) -> None: | ||
self._http_mocker.post( | ||
self._create_job_creation_request(start_date, first_upper_boundary), | ||
self._make_sliced_job_request(lower_boundary, upper_boundary, fields), | ||
HttpResponse(json.dumps({"id": job_id})), | ||
) | ||
self._http_mocker.get( | ||
|
@@ -123,24 +325,37 @@ def _create_sliced_job(self, start_date: datetime, first_upper_boundary: datetim | |
) | ||
self._http_mocker.get( | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{job_id}/results"), | ||
HttpResponse(job_result), | ||
HttpResponse(self._generate_csv(fields, count=record_count)), | ||
) | ||
self._mock_delete_job(job_id) | ||
|
||
def _mock_delete_job(self, job_id: str) -> None: | ||
self._http_mocker._mock_request_method( # FIXME to add DELETE method in airbyte_cdk tests | ||
"delete", | ||
HttpRequest(f"{_BASE_URL}/jobs/query/{job_id}"), | ||
HttpResponse(job_result), | ||
HttpResponse(""), | ||
) | ||
|
||
def _create_job_creation_request(self, start_date: datetime, first_upper_boundary: datetime) -> HttpRequest: | ||
def _make_sliced_job_request(self, lower_boundary: datetime, upper_boundary: datetime, fields: List[str]) -> HttpRequest: | ||
return self._build_job_creation_request(f"SELECT {', '.join(fields)} FROM a_stream_name WHERE SystemModstamp >= {lower_boundary.isoformat(timespec='milliseconds')} AND SystemModstamp < {upper_boundary.isoformat(timespec='milliseconds')}") | ||
|
||
def _make_full_job_request(self, fields: List[str]) -> HttpRequest: | ||
return self._build_job_creation_request(f"SELECT {', '.join(fields)} FROM a_stream_name") | ||
|
||
def _generate_csv(self, fields: List[str], count: int = 1) -> str: | ||
""" | ||
This method does not handle field types for now which may cause some test failures on change if we start considering using some | ||
fields for calculation. One example of that would be cursor field parsing to datetime. | ||
""" | ||
record = ','.join([f"{field}_value" for field in fields]) | ||
records = '\n'.join([record for _ in range(count)]) | ||
return f"{','.join(fields)}\n{records}" | ||
|
||
def _build_job_creation_request(self, query: str) -> HttpRequest: | ||
return HttpRequest(f"{_BASE_URL}/jobs/query", body=json.dumps({ | ||
"operation": "queryAll", | ||
"query": f"SELECT a_field, SystemModstamp FROM a_stream_name WHERE SystemModstamp >= {start_date.isoformat(timespec='milliseconds')} AND SystemModstamp < {first_upper_boundary.isoformat(timespec='milliseconds')}", | ||
"query": query, | ||
"contentType": "CSV", | ||
"columnDelimiter": "COMMA", | ||
"lineEnding": "LF" | ||
})) | ||
|
||
def _generate_csv(self, fields: List[str], count: int = 1) -> str: | ||
record = ','.join([f"{field}_value" for field in fields]) | ||
records = '\n'.join([record for _ in range(count)]) | ||
return f"{','.join(fields)}\n{records}" |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to
test_given_locator_when_read_then_extract_records_from_both_pages