Skip to content

Commit 62c433e

Browse files
Source Salesforce: fix pagination in REST API streams (#9151)
* fix next_page_token * fix BULK API * fix BUlk incremental stream * added unit test and comments * format code * bump version * updated spec and def yaml Co-authored-by: auganbay <[email protected]>
1 parent 3f9cbec commit 62c433e

File tree

6 files changed

+116
-16
lines changed

6 files changed

+116
-16
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@
613613
- name: Salesforce
614614
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
615615
dockerRepository: airbyte/source-salesforce
616-
dockerImageTag: 0.1.15
616+
dockerImageTag: 0.1.16
617617
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
618618
icon: salesforce.svg
619619
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6425,7 +6425,7 @@
64256425
supportsNormalization: false
64266426
supportsDBT: false
64276427
supported_destination_sync_modes: []
6428-
- dockerImage: "airbyte/source-salesforce:0.1.15"
6428+
- dockerImage: "airbyte/source-salesforce:0.1.16"
64296429
spec:
64306430
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
64316431
connectionSpecification:

airbyte-integrations/connectors/source-salesforce/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
2525
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
2626
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
2727

28-
LABEL io.airbyte.version=0.1.15
28+
LABEL io.airbyte.version=0.1.16
2929
LABEL io.airbyte.name=airbyte/source-salesforce

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,29 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
4444
def url_base(self) -> str:
4545
return self.sf_api.instance_url
4646

47-
def path(self, **kwargs) -> str:
47+
def path(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> str:
48+
if next_page_token:
49+
"""
50+
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`.
51+
"""
52+
return next_page_token
4853
return f"/services/data/{self.sf_api.version}/queryAll"
4954

5055
def next_page_token(self, response: requests.Response) -> str:
5156
response_data = response.json()
52-
if len(response_data["records"]) == self.page_size and self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
53-
return f"WHERE {self.primary_key} >= '{response_data['records'][-1][self.primary_key]}' "
57+
return response_data.get("nextRecordsUrl")
5458

5559
def request_params(
5660
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
5761
) -> MutableMapping[str, Any]:
5862
"""
5963
Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm
6064
"""
65+
if next_page_token:
66+
"""
67+
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters.
68+
"""
69+
return {}
6170

6271
selected_properties = self.get_json_schema().get("properties", {})
6372

@@ -70,11 +79,9 @@ def request_params(
7079
}
7180

7281
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
73-
if next_page_token:
74-
query += next_page_token
7582

7683
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
77-
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}"
84+
query += f"ORDER BY {self.primary_key} ASC"
7885

7986
return {"q": query}
8087

@@ -259,6 +266,32 @@ def next_page_token(self, last_record: dict) -> str:
259266
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
260267
return f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "
261268

269+
def request_params(
270+
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
271+
) -> MutableMapping[str, Any]:
272+
"""
273+
Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm
274+
"""
275+
276+
selected_properties = self.get_json_schema().get("properties", {})
277+
278+
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
279+
if self.sf_api.api_type == "BULK":
280+
selected_properties = {
281+
key: value
282+
for key, value in selected_properties.items()
283+
if value.get("format") != "base64" and "object" not in value["type"]
284+
}
285+
286+
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
287+
if next_page_token:
288+
query += next_page_token
289+
290+
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
291+
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}"
292+
293+
return {"q": query}
294+
262295
def read_records(
263296
self,
264297
sync_mode: SyncMode,
@@ -305,14 +338,15 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]:
305338
if start_date:
306339
return pendulum.parse(start_date).strftime("%Y-%m-%dT%H:%M:%SZ")
307340

308-
def next_page_token(self, response: requests.Response) -> str:
309-
response_data = response.json()
310-
if len(response_data["records"]) == self.page_size and self.name not in UNSUPPORTED_FILTERING_STREAMS:
311-
return response_data["records"][-1][self.cursor_field]
312-
313341
def request_params(
314342
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
315343
) -> MutableMapping[str, Any]:
344+
if next_page_token:
345+
"""
346+
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters.
347+
"""
348+
return {}
349+
316350
selected_properties = self.get_json_schema().get("properties", {})
317351

318352
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
@@ -324,13 +358,13 @@ def request_params(
324358
}
325359

326360
stream_date = stream_state.get(self.cursor_field)
327-
start_date = next_page_token or stream_date or self.start_date
361+
start_date = stream_date or self.start_date
328362

329363
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
330364
if start_date:
331365
query += f"WHERE {self.cursor_field} >= {start_date} "
332366
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
333-
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}"
367+
query += f"ORDER BY {self.cursor_field} ASC"
334368
return {"q": query}
335369

336370
@property
@@ -352,3 +386,26 @@ class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalSalesforc
352386
def next_page_token(self, last_record: dict) -> str:
353387
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
354388
return last_record[self.cursor_field]
389+
390+
def request_params(
391+
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
392+
) -> MutableMapping[str, Any]:
393+
selected_properties = self.get_json_schema().get("properties", {})
394+
395+
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
396+
if self.sf_api.api_type == "BULK":
397+
selected_properties = {
398+
key: value
399+
for key, value in selected_properties.items()
400+
if value.get("format") != "base64" and "object" not in value["type"]
401+
}
402+
403+
stream_date = stream_state.get(self.cursor_field)
404+
start_date = next_page_token or stream_date or self.start_date
405+
406+
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
407+
if start_date:
408+
query += f"WHERE {self.cursor_field} >= {start_date} "
409+
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
410+
query += f"ORDER BY {self.cursor_field} ASC LIMIT {self.page_size}"
411+
return {"q": query}

airbyte-integrations/connectors/source-salesforce/unit_tests/unit_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,45 @@ def test_discover_with_streams_criteria_param(streams_criteria, predicted_filter
349349
)
350350
filtered_streams = sf_object.get_validated_streams(config=updated_config)
351351
assert sorted(filtered_streams) == sorted(predicted_filtered_streams)
352+
353+
354+
def test_pagination_rest(stream_rest_config, stream_rest_api):
355+
stream: SalesforceStream = _generate_stream("Account", stream_rest_config, stream_rest_api)
356+
stream._wait_timeout = 0.1 # maximum wait timeout will be 6 seconds
357+
next_page_url = "/services/data/v52.0/query/012345"
358+
with requests_mock.Mocker() as m:
359+
resp_1 = {
360+
"done": False,
361+
"totalSize": 4,
362+
"nextRecordsUrl": next_page_url,
363+
"records": [
364+
{
365+
"ID": 1,
366+
"LastModifiedDate": "2021-11-15",
367+
},
368+
{
369+
"ID": 2,
370+
"LastModifiedDate": "2021-11-16",
371+
},
372+
],
373+
}
374+
resp_2 = {
375+
"done": True,
376+
"totalSize": 4,
377+
"records": [
378+
{
379+
"ID": 3,
380+
"LastModifiedDate": "2021-11-17",
381+
},
382+
{
383+
"ID": 4,
384+
"LastModifiedDate": "2021-11-18",
385+
},
386+
],
387+
}
388+
389+
m.register_uri("GET", stream.path(), json=resp_1)
390+
m.register_uri("GET", next_page_url, json=resp_2)
391+
392+
records = [record for record in stream.read_records(sync_mode=SyncMode.full_refresh)]
393+
assert len(records) == 4

docs/integrations/sources/salesforce.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ List of available streams:
737737

738738
| Version | Date | Pull Request | Subject |
739739
|:--------|:-----------| :--- |:--------------------------------------------------------------------------|
740+
| 0.1.16 | 2022-01-18 | [9151](https://github.com/airbytehq/airbyte/pull/9151) | Fix pagination in REST API streams |
740741
| 0.1.15 | 2022-01-11 | [9409](https://github.com/airbytehq/airbyte/pull/9409) | Correcting the presence of an extra `else` handler in the error handling |
741742
| 0.1.14 | 2022-01-11 | [9386](https://github.com/airbytehq/airbyte/pull/9386) | Handling 400 error, while `sobject` doesn't support `query` or `queryAll` requests |
742743
| 0.1.13 | 2022-01-11 | [8797](https://github.com/airbytehq/airbyte/pull/8797) | Switched from authSpecification to advanced_auth in specefication |

0 commit comments

Comments
 (0)