Skip to content

Commit 267d23c

Browse files
committed
#1403 source salesforce: review fixes
1 parent cf4eaf8 commit 267d23c

File tree

2 files changed

+31
-14
lines changed

2 files changed

+31
-14
lines changed

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,16 +187,14 @@ def _read_pages(
187187
) -> Iterable[StreamData]:
188188
stream_state = stream_state or {}
189189
pagination_complete = False
190-
next_page_token = None
191190
records = {}
192191
next_pages = {}
193192

194193
while not pagination_complete:
195194
index = 0
196195
for index, property_chunk in enumerate(self.chunk_properties()):
197196
request, response = self._fetch_next_page(stream_slice, stream_state, next_pages.get(index), property_chunk)
198-
next_page_token = self.next_page_token(response)
199-
next_pages[index] = next_page_token
197+
next_pages[index] = self.next_page_token(response)
200198
chunk_page_records = records_generator_fn(request, response, stream_state, stream_slice)
201199
if not self.too_many_properties:
202200
# this is the case when a stream has no primary key
@@ -208,25 +206,28 @@ def _read_pages(
208206

209207
for record_id, record in chunk_page_records.items():
210208
if record_id not in records:
211-
records[record_id] = (record, 0)
209+
records[record_id] = (record, 1)
212210
continue
213211
incomplete_record, counter = records[record_id]
214212
incomplete_record.update(record)
215213
counter += 1
216214
records[record_id] = (incomplete_record, counter)
217215

218216
for record_id, (record, counter) in records.items():
219-
if counter != index:
217+
if counter != index + 1:
220218
# Because we make multiple calls to query N records (each call to fetch X properties of all the N records),
221219
# there's a chance that the number of records corresponding to the query may change between the calls. This
222220
# may result in data inconsistency. We skip such records for now and log a warning message.
223-
self.logger.warning(f"Inconsistent record with primary key {record_id} found. Skipping it.")
221+
self.logger.warning(
222+
f"Inconsistent record with primary key {record_id} found. It consists of {counter} chunks instead of {index + 1}. "
223+
f"Skipping it."
224+
)
224225
continue
225226
yield record
226227

227228
records = {}
228229

229-
if not next_page_token:
230+
if not any(next_pages.values()):
230231
pagination_complete = True
231232

232233
# Always return an empty generator just in case no records were ever yielded

airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -625,37 +625,53 @@ def test_rest_stream_init_with_too_many_properties(stream_config, stream_api_v2_
625625

626626

627627
def test_too_many_properties(stream_config, stream_api_v2_pk_too_many_properties, requests_mock):
628+
stream = generate_stream("Account", stream_config, stream_api_v2_pk_too_many_properties)
629+
chunks = len(list(stream.chunk_properties()))
630+
assert stream.too_many_properties
631+
assert stream.primary_key
632+
assert type(stream) == RestSalesforceStream
633+
url = next_page_url = "https://fase-account.salesforce.com/services/data/v52.0/queryAll"
628634
requests_mock.get(
629-
"https://fase-account.salesforce.com/services/data/v52.0/queryAll",
635+
url,
630636
[
631637
{
632638
"json": {
633-
"nextRecordsUrl": "next",
639+
"nextRecordsUrl": next_page_url,
634640
"records": [{"Id": 1, "propertyA": "A"}, {"Id": 2, "propertyA": "A"}]
635641
}
636642
},
637643
{
638644
"json": {
645+
"nextRecordsUrl": next_page_url,
639646
"records": [{"Id": 1, "propertyB": "B"}, {"Id": 2, "propertyB": "B"}]
640647
}
641648
},
649+
# 2 for 2 chunks above and 1 for a chunk below
650+
*[{"json": {"records": [{"Id": 1}, {"Id": 2}], "nextRecordsUrl": next_page_url}} for _ in range(chunks - 3)],
651+
{
652+
"json": {
653+
"records": [{"Id": 1}, {"Id": 2}]
654+
}
655+
},
642656
{
643657
"json": {
644-
"nextRecordsUrl": "next",
645658
"records": [{"Id": 3, "propertyA": "A"}, {"Id": 4, "propertyA": "A"}]
646659
}
647660
},
648661
{
649662
"json": {
650663
"records": [{"Id": 3, "propertyB": "B"}, {"Id": 4, "propertyB": "B"}]
651664
}
665+
},
666+
# 2 for 2 chunks above and 1 for a chunk below
667+
*[{"json": {"records": [{"Id": 3}, {"Id": 4}]}} for _ in range(chunks - 3)],
668+
{
669+
"json": {
670+
"records": [{"Id": 3}, {"Id": 4}]
671+
}
652672
}
653673
]
654674
)
655-
stream = generate_stream("Account", stream_config, stream_api_v2_pk_too_many_properties)
656-
assert stream.too_many_properties
657-
assert stream.primary_key
658-
assert type(stream) == RestSalesforceStream
659675
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
660676
assert records == [
661677
{"Id": 1, "propertyA": "A", "propertyB": "B"},

0 commit comments

Comments
 (0)