Skip to content

Commit 676b52e

Browse files
authored
✨Source Salesforce: Migrating to non-deprecated authenticator (#38065)
1 parent d26bd10 commit 676b52e

File tree

9 files changed

+24
-16
lines changed

9 files changed

+24
-16
lines changed

airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import time
88
from datetime import datetime
99
from pathlib import Path
10+
from typing import Dict
1011

1112
import pendulum
1213
import pytest
@@ -39,6 +40,10 @@ def sf(input_sandbox_config):
3940
return sf
4041

4142

43+
def _authentication_headers(salesforce: Salesforce) -> Dict[str, str]:
44+
return {"Authorization": f"Bearer {salesforce.access_token}"}
45+
46+
4247
@pytest.fixture(scope="module")
4348
def stream_name():
4449
return "ContentNote"
@@ -75,8 +80,8 @@ def get_stream_state():
7580
return {"LastModifiedDate": pendulum.now(tz="UTC").add(days=-1).isoformat(timespec="milliseconds")}
7681

7782

78-
def test_update_for_deleted_record(stream):
79-
headers = stream.authenticator.get_auth_header()
83+
def test_update_for_deleted_record(stream, sf):
84+
headers = _authentication_headers(sf)
8085
stream_state = get_stream_state()
8186
time.sleep(1)
8287
response = create_note(stream, headers)
@@ -138,8 +143,8 @@ def test_update_for_deleted_record(stream):
138143
assert response.status_code == 404, "Expected an update to a deleted note to return 404"
139144

140145

141-
def test_deleted_record(stream):
142-
headers = stream.authenticator.get_auth_header()
146+
def test_deleted_record(stream, sf):
147+
headers = _authentication_headers(sf)
143148
response = create_note(stream, headers)
144149
assert response.status_code == 201, "Note was note created"
145150

airbyte-integrations/connectors/source-salesforce/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.5.9
13+
dockerImageTag: 2.5.10
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.5.9"
6+
version = "2.5.10"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from airbyte_cdk.sources.streams import Stream
2121
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
2222
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, FinalStateCursor
23-
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
23+
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
2424
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
2525
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2626
from airbyte_protocol.models import FailureType

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def _fetch_next_page_for_chunk(
309309
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
310310
request = self._create_prepared_request(
311311
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
312-
headers=dict(request_headers, **self.authenticator.get_auth_header()),
312+
headers=dict(request_headers),
313313
params=self.request_params(
314314
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, property_chunk=property_chunk
315315
),
@@ -365,7 +365,6 @@ def _send_http_request(self, method: str, url: str, json: dict = None, headers:
365365
return self._non_retryable_send_http_request(method, url, json, headers, stream)
366366

367367
def _non_retryable_send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
368-
headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header()
369368
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
370369
if response.status_code not in [200, 204]:
371370
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
@@ -706,7 +705,7 @@ def get_standard_instance(self) -> SalesforceStream:
706705
stream_name=self.stream_name,
707706
schema=self.schema,
708707
sobject_options=self.sobject_options,
709-
authenticator=self.authenticator,
708+
authenticator=self._session.auth,
710709
)
711710
new_cls: Type[SalesforceStream] = RestSalesforceStream
712711
if isinstance(self, BulkIncrementalSalesforceStream):

airbyte-integrations/connectors/source-salesforce/unit_tests/integration/test_bulk_stream.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def setUp(self) -> None:
6464
self._http_mocker = HttpMocker()
6565
self._http_mocker.__enter__()
6666

67-
given_authentication(self._http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN, _INSTANCE_URL)
67+
given_authentication(self._http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN, _INSTANCE_URL, _ACCESS_TOKEN)
6868

6969
def tearDown(self) -> None:
7070
self._http_mocker.__exit__(None, None, None)
@@ -197,7 +197,7 @@ def test_given_job_is_failed_when_read_then_switch_to_standard(self):
197197
JobInfoResponseBuilder().with_id(_JOB_ID).with_state("Failed").build(),
198198
)
199199
self._http_mocker.get(
200-
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]),
200+
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME], _ACCESS_TOKEN),
201201
create_standard_http_response([_A_FIELD_NAME]),
202202
)
203203
self._mock_delete_job(_JOB_ID)

airbyte-integrations/connectors/source-salesforce/unit_tests/integration/test_rest_stream.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0]
2929

3030

31-
def create_http_request(stream_name: str, field_names: List[str]) -> HttpRequest:
32-
return HttpRequest(f"{_BASE_URL}/queryAll?q=SELECT+{','.join(field_names)}+FROM+{stream_name}+")
31+
def create_http_request(stream_name: str, field_names: List[str], access_token: Optional[str] = None) -> HttpRequest:
32+
return HttpRequest(
33+
f"{_BASE_URL}/queryAll?q=SELECT+{','.join(field_names)}+FROM+{stream_name}+",
34+
headers={"Authorization": f"Bearer {access_token}"} if access_token else None
35+
)
3336

3437

3538
def create_http_response(field_names: List[str], record_count: int = 1) -> HttpResponse:

airbyte-integrations/connectors/source-salesforce/unit_tests/integration/utils.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ def read(
4343
return entrypoint_read(_source(catalog, config, state), config, catalog, state, expecting_exception)
4444

4545

46-
def given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str, instance_url: str) -> None:
46+
def given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str, instance_url: str, access_token: str = "any_access_token") -> None:
4747
http_mocker.post(
4848
HttpRequest(
4949
"https://login.salesforce.com/services/oauth2/token",
5050
query_params=ANY_QUERY_PARAMS,
5151
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
5252
),
53-
HttpResponse(json.dumps({"access_token": "any_access_token", "instance_url": instance_url})),
53+
HttpResponse(json.dumps({"access_token": access_token, "instance_url": instance_url})),
5454
)
5555

5656

docs/integrations/sources/salesforce.md

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ Now that you have set up the Salesforce source connector, check out the followin
192192

193193
| Version | Date | Pull Request | Subject |
194194
| :------ | :--------- | :------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------- |
195+
| 2.5.10 | 2024-05-09 | [38065](https://github.com/airbytehq/airbyte/pull/38065) | Replace deprecated authentication mechanism to up-to-date one |
195196
| 2.5.9 | 2024-05-02 | [37749](https://github.com/airbytehq/airbyte/pull/37749) | Adding mock server tests for bulk streams |
196197
| 2.5.8 | 2024-04-30 | [37340](https://github.com/airbytehq/airbyte/pull/37340) | Source Salesforce: reduce info logs |
197198
| 2.5.7 | 2024-04-24 | [36657](https://github.com/airbytehq/airbyte/pull/36657) | Schema descriptions |

0 commit comments

Comments
 (0)