Skip to content

Commit b9d4f12

Browse files
Source Salesforce: handle too many properties (#22597)
* #1403 source Salesforce: handle too many properties * #1403 source Salesforce: update changelog * #1403 source salesforce: log warning and skip inconsistent records * #1403 source salesforce: review fixes * auto-bump connector version --------- Co-authored-by: Octavia Squidington III <[email protected]>
1 parent fd6497c commit b9d4f12

File tree

8 files changed

+257
-51
lines changed

8 files changed

+257
-51
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1565,7 +1565,7 @@
15651565
- name: Salesforce
15661566
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
15671567
dockerRepository: airbyte/source-salesforce
1568-
dockerImageTag: 2.0.0
1568+
dockerImageTag: 2.0.1
15691569
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
15701570
icon: salesforce.svg
15711571
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13106,7 +13106,7 @@
1310613106
supportsNormalization: false
1310713107
supportsDBT: false
1310813108
supported_destination_sync_modes: []
13109-
- dockerImage: "airbyte/source-salesforce:2.0.0"
13109+
- dockerImage: "airbyte/source-salesforce:2.0.1"
1311013110
spec:
1311113111
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
1311213112
connectionSpecification:

airbyte-integrations/connectors/source-salesforce/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ RUN pip install .
1313

1414
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1515

16-
LABEL io.airbyte.version=2.0.0
16+
LABEL io.airbyte.version=2.0.1
1717
LABEL io.airbyte.name=airbyte/source-salesforce

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from requests import codes, exceptions # type: ignore[import]
1818

1919
from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
20-
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalSalesforceStream, SalesforceStream
20+
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalRestSalesforceStream, RestSalesforceStream
2121

2222

2323
class AirbyteStopSync(AirbyteTracedException):
@@ -59,17 +59,10 @@ def _get_api_type(cls, stream_name, properties):
5959
properties_not_supported_by_bulk = {
6060
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
6161
}
62-
properties_length = len(",".join(p for p in properties))
63-
6462
rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
65-
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
66-
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
67-
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
68-
69-
if rest_required and not bulk_required:
63+
if rest_required:
7064
return "rest"
71-
if not rest_required:
72-
return "bulk"
65+
return "bulk"
7366

7467
@classmethod
7568
def generate_streams(
@@ -79,6 +72,7 @@ def generate_streams(
7972
sf_object: Salesforce,
8073
) -> List[Stream]:
8174
""" "Generates a list of stream by their names. It can be used for different tests too"""
75+
logger = logging.getLogger()
8276
authenticator = TokenAuthenticator(sf_object.access_token)
8377
stream_properties = sf_object.generate_schemas(stream_objects)
8478
streams = []
@@ -88,7 +82,7 @@ def generate_streams(
8882

8983
api_type = cls._get_api_type(stream_name, selected_properties)
9084
if api_type == "rest":
91-
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
85+
full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream
9286
elif api_type == "bulk":
9387
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
9488
else:
@@ -98,10 +92,17 @@ def generate_streams(
9892
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
9993
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
10094
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
101-
streams.append(incremental(**streams_kwargs, replication_key=replication_key, start_date=config.get("start_date")))
95+
stream = incremental(**streams_kwargs, replication_key=replication_key, start_date=config.get("start_date"))
10296
else:
103-
streams.append(full_refresh(**streams_kwargs))
104-
97+
stream = full_refresh(**streams_kwargs)
98+
if api_type == "rest" and not stream.primary_key and stream.too_many_properties:
99+
logger.warning(
100+
f"Can not instantiate stream {stream_name}. "
101+
f"It is not supported by the BULK API and can not be implemented via REST because the number of its properties "
102+
f"exceeds the limit and it lacks a primary key."
103+
)
104+
continue
105+
streams.append(stream)
105106
return streams
106107

107108
def streams(self, config: Mapping[str, Any]) -> List[Stream]:

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

Lines changed: 144 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
import time
1010
from abc import ABC
1111
from contextlib import closing
12-
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
12+
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
1313

1414
import pandas as pd
1515
import pendulum
1616
import requests # type: ignore[import]
1717
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
18-
from airbyte_cdk.sources.streams import Stream
1918
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
19+
from airbyte_cdk.sources.streams.core import Stream, StreamData
2020
from airbyte_cdk.sources.streams.http import HttpStream
2121
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
2222
from numpy import nan
@@ -38,6 +38,7 @@ class SalesforceStream(HttpStream, ABC):
3838
page_size = 2000
3939
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
4040
encoding = DEFAULT_ENCODING
41+
MAX_PROPERTIES_LENGTH = Salesforce.REQUEST_SIZE_LIMITS - 2000
4142

4243
def __init__(
4344
self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, **kwargs
@@ -65,6 +66,31 @@ def url_base(self) -> str:
6566
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
6667
return None
6768

69+
@property
70+
def too_many_properties(self):
71+
selected_properties = self.get_json_schema().get("properties", {})
72+
properties_length = len(",".join(p for p in selected_properties))
73+
return properties_length > self.MAX_PROPERTIES_LENGTH
74+
75+
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
76+
yield from response.json()["records"]
77+
78+
def get_json_schema(self) -> Mapping[str, Any]:
79+
if not self.schema:
80+
self.schema = self.sf_api.generate_schema(self.name)
81+
return self.schema
82+
83+
def get_error_display_message(self, exception: BaseException) -> Optional[str]:
84+
if isinstance(exception, exceptions.ConnectionError):
85+
return f"After {self.max_retries} retries the connector has failed with a network error. It looks like Salesforce API experienced temporary instability, please try again later."
86+
return super().get_error_display_message(exception)
87+
88+
89+
class RestSalesforceStream(SalesforceStream):
90+
def __init__(self, *args, **kwargs):
91+
super().__init__(*args, **kwargs)
92+
assert self.primary_key or not self.too_many_properties
93+
6894
def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str:
6995
if next_page_token:
7096
"""
@@ -80,7 +106,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
80106
return {"next_token": next_token} if next_token else None
81107

82108
def request_params(
83-
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
109+
self,
110+
stream_state: Mapping[str, Any],
111+
stream_slice: Mapping[str, Any] = None,
112+
next_page_token: Mapping[str, Any] = None,
113+
property_chunk: Mapping[str, Any] = None,
84114
) -> MutableMapping[str, Any]:
85115
"""
86116
Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm
@@ -91,32 +121,44 @@ def request_params(
91121
"""
92122
return {}
93123

94-
selected_properties = self.get_json_schema().get("properties", {})
95-
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
124+
property_chunk = property_chunk or {}
125+
query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} "
96126

97127
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
98128
query += f"ORDER BY {self.primary_key} ASC"
99129

100130
return {"q": query}
101131

102-
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
103-
yield from response.json()["records"]
132+
def chunk_properties(self) -> Iterable[Mapping[str, Any]]:
133+
selected_properties = self.get_json_schema().get("properties", {})
104134

105-
def get_json_schema(self) -> Mapping[str, Any]:
106-
if not self.schema:
107-
self.schema = self.sf_api.generate_schema(self.name)
108-
return self.schema
135+
summary_length = 0
136+
local_properties = {}
137+
for property_name, value in selected_properties.items():
138+
current_property_length = len(property_name) + 1 # properties are split with commas
139+
if current_property_length + summary_length >= self.MAX_PROPERTIES_LENGTH:
140+
yield local_properties
141+
local_properties = {}
142+
summary_length = 0
143+
144+
local_properties[property_name] = value
145+
summary_length += current_property_length
146+
147+
if local_properties:
148+
yield local_properties
109149

110150
def read_records(
111151
self,
112152
sync_mode: SyncMode,
113153
cursor_field: List[str] = None,
114154
stream_slice: Mapping[str, Any] = None,
115155
stream_state: Mapping[str, Any] = None,
116-
) -> Iterable[Mapping[str, Any]]:
156+
) -> Iterable[StreamData]:
117157
try:
118-
yield from super().read_records(
119-
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
158+
yield from self._read_pages(
159+
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state),
160+
stream_slice,
161+
stream_state,
120162
)
121163
except exceptions.HTTPError as error:
122164
"""
@@ -135,10 +177,83 @@ def read_records(
135177
return
136178
raise error
137179

138-
def get_error_display_message(self, exception: BaseException) -> Optional[str]:
139-
if isinstance(exception, exceptions.ConnectionError):
140-
return f"After {self.max_retries} retries the connector has failed with a network error. It looks like Salesforce API experienced temporary instability, please try again later."
141-
return super().get_error_display_message(exception)
180+
def _read_pages(
181+
self,
182+
records_generator_fn: Callable[
183+
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData]
184+
],
185+
stream_slice: Mapping[str, Any] = None,
186+
stream_state: Mapping[str, Any] = None,
187+
) -> Iterable[StreamData]:
188+
stream_state = stream_state or {}
189+
pagination_complete = False
190+
records = {}
191+
next_pages = {}
192+
193+
while not pagination_complete:
194+
index = 0
195+
for index, property_chunk in enumerate(self.chunk_properties()):
196+
request, response = self._fetch_next_page(stream_slice, stream_state, next_pages.get(index), property_chunk)
197+
next_pages[index] = self.next_page_token(response)
198+
chunk_page_records = records_generator_fn(request, response, stream_state, stream_slice)
199+
if not self.too_many_properties:
200+
# this is the case when a stream has no primary key
201+
# (is allowed when properties length does not exceed the maximum value)
202+
# so there would be a single iteration, therefore we may and should yield records immediately
203+
yield from chunk_page_records
204+
break
205+
chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}
206+
207+
for record_id, record in chunk_page_records.items():
208+
if record_id not in records:
209+
records[record_id] = (record, 1)
210+
continue
211+
incomplete_record, counter = records[record_id]
212+
incomplete_record.update(record)
213+
counter += 1
214+
records[record_id] = (incomplete_record, counter)
215+
216+
for record_id, (record, counter) in records.items():
217+
if counter != index + 1:
218+
# Because we make multiple calls to query N records (each call to fetch X properties of all the N records),
219+
# there's a chance that the number of records corresponding to the query may change between the calls. This
220+
# may result in data inconsistency. We skip such records for now and log a warning message.
221+
self.logger.warning(
222+
f"Inconsistent record with primary key {record_id} found. It consists of {counter} chunks instead of {index + 1}. "
223+
f"Skipping it."
224+
)
225+
continue
226+
yield record
227+
228+
records = {}
229+
230+
if not any(next_pages.values()):
231+
pagination_complete = True
232+
233+
# Always return an empty generator just in case no records were ever yielded
234+
yield from []
235+
236+
def _fetch_next_page(
237+
self,
238+
stream_slice: Mapping[str, Any] = None,
239+
stream_state: Mapping[str, Any] = None,
240+
next_page_token: Mapping[str, Any] = None,
241+
property_chunk: Mapping[str, Any] = None,
242+
) -> Tuple[requests.PreparedRequest, requests.Response]:
243+
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
244+
request = self._create_prepared_request(
245+
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
246+
headers=dict(request_headers, **self.authenticator.get_auth_header()),
247+
params=self.request_params(
248+
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, property_chunk=property_chunk
249+
),
250+
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
251+
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
252+
)
253+
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
254+
255+
response = self._send_request(request, request_kwargs)
256+
return request, response
142257

143258

144259
class BulkSalesforceStream(SalesforceStream):
@@ -406,10 +521,10 @@ def get_standard_instance(self) -> SalesforceStream:
406521
sobject_options=self.sobject_options,
407522
authenticator=self.authenticator,
408523
)
409-
new_cls: Type[SalesforceStream] = SalesforceStream
524+
new_cls: Type[SalesforceStream] = RestSalesforceStream
410525
if isinstance(self, BulkIncrementalSalesforceStream):
411526
stream_kwargs.update({"replication_key": self.replication_key, "start_date": self.start_date})
412-
new_cls = IncrementalSalesforceStream
527+
new_cls = IncrementalRestSalesforceStream
413528

414529
return new_cls(**stream_kwargs)
415530

@@ -426,7 +541,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):
426541
return instance
427542

428543

429-
class IncrementalSalesforceStream(SalesforceStream, ABC):
544+
class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
430545
state_checkpoint_interval = 500
431546

432547
def __init__(self, replication_key: str, start_date: Optional[str], **kwargs):
@@ -442,20 +557,24 @@ def format_start_date(start_date: Optional[str]) -> Optional[str]:
442557
return None
443558

444559
def request_params(
445-
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
560+
self,
561+
stream_state: Mapping[str, Any],
562+
stream_slice: Mapping[str, Any] = None,
563+
next_page_token: Mapping[str, Any] = None,
564+
property_chunk: Mapping[str, Any] = None,
446565
) -> MutableMapping[str, Any]:
447566
if next_page_token:
448567
"""
449568
If `next_page_token` is set, subsequent requests use `nextRecordsUrl`, and do not include any parameters.
450569
"""
451570
return {}
452571

453-
selected_properties = self.get_json_schema().get("properties", {})
572+
property_chunk = property_chunk or {}
454573

455574
stream_date = stream_state.get(self.cursor_field)
456575
start_date = stream_date or self.start_date
457576

458-
query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} "
577+
query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} "
459578
if start_date:
460579
query += f"WHERE {self.cursor_field} >= {start_date} "
461580
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
@@ -477,7 +596,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
477596
return {self.cursor_field: latest_benchmark}
478597

479598

480-
class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalSalesforceStream):
599+
class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream):
481600
def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]:
482601
if self.name not in UNSUPPORTED_FILTERING_STREAMS:
483602
page_token: str = last_record[self.cursor_field]

0 commit comments

Comments
 (0)