Skip to content

Commit 2617a03

Browse files
authored
[ISSUE #6829] update salesforce to support partitioned state (#36942)
1 parent 12612c9 commit 2617a03

17 files changed

+504
-271
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[run]
2+
omit =
3+
source_salesforce/run.py

airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import pytest
1313
import requests
1414
from airbyte_cdk.models import SyncMode
15+
from airbyte_protocol.models import ConfiguredAirbyteCatalog
1516
from source_salesforce.api import Salesforce
1617
from source_salesforce.source import SourceSalesforce
1718

@@ -20,6 +21,10 @@
2021
NOTE_CONTENT = "It's the note for integration test"
2122
UPDATED_NOTE_CONTENT = "It's the updated note for integration test"
2223

24+
_ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []})
25+
_ANY_CONFIG = {}
26+
_ANY_STATE = {}
27+
2328

2429
@pytest.fixture(scope="module")
2530
def input_sandbox_config():
@@ -41,7 +46,7 @@ def stream_name():
4146

4247
@pytest.fixture(scope="module")
4348
def stream(input_sandbox_config, stream_name, sf):
44-
return SourceSalesforce.generate_streams(input_sandbox_config, {stream_name: None}, sf)[0]
49+
return SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE).generate_streams(input_sandbox_config, {stream_name: None}, sf)[0]._legacy_stream
4550

4651

4752
def _encode_content(text):

airbyte-integrations/connectors/source-salesforce/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.4.4
13+
dockerImageTag: 2.5.0
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/poetry.lock

+10-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-salesforce/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.4.4"
6+
version = "2.5.0"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-salesforce/source_salesforce/availability_strategy.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def handle_http_error(
2929
if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]:
3030
error_data = error.response.json()[0]
3131
error_code = error_data.get("errorCode", "")
32-
if error_code != "REQUEST_LIMIT_EXCEEDED" or error_code == "INVALID_TYPE_FOR_OPERATION":
32+
if error_code != "REQUEST_LIMIT_EXCEEDED":
3333
return False, f"Cannot receive data for stream '{stream.name}', error message: '{error_data.get('message')}'"
3434
return True, None
3535
raise error

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

+57-39
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
#
44

55
import logging
6-
from datetime import datetime
6+
from datetime import datetime, timedelta, timezone
77
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
88

9+
import isodate
910
import pendulum
1011
import requests
1112
from airbyte_cdk import AirbyteLogger
@@ -29,6 +30,7 @@
2930

3031
from .api import PARENT_SALESFORCE_OBJECTS, UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
3132
from .streams import (
33+
LOOKBACK_SECONDS,
3234
BulkIncrementalSalesforceStream,
3335
BulkSalesforceStream,
3436
BulkSalesforceSubStream,
@@ -172,9 +174,8 @@ def prepare_stream(cls, stream_name: str, json_schema, sobject_options, sf_objec
172174

173175
return stream_class, stream_kwargs
174176

175-
@classmethod
176177
def generate_streams(
177-
cls,
178+
self,
178179
config: Mapping[str, Any],
179180
stream_objects: Mapping[str, Any],
180181
sf_object: Salesforce,
@@ -184,69 +185,86 @@ def generate_streams(
184185
schemas = sf_object.generate_schemas(stream_objects)
185186
default_args = [sf_object, authenticator, config]
186187
streams = []
188+
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self.state)
187189
for stream_name, sobject_options in stream_objects.items():
188190
json_schema = schemas.get(stream_name, {})
189191

190-
stream_class, kwargs = cls.prepare_stream(stream_name, json_schema, sobject_options, *default_args)
192+
stream_class, kwargs = self.prepare_stream(stream_name, json_schema, sobject_options, *default_args)
191193

192194
parent_name = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("parent_name")
193195
if parent_name:
194196
# get minimal schema required for getting proper class name full_refresh/incremental, rest/bulk
195197
parent_schema = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("schema_minimal")
196-
parent_class, parent_kwargs = cls.prepare_stream(parent_name, parent_schema, sobject_options, *default_args)
198+
parent_class, parent_kwargs = self.prepare_stream(parent_name, parent_schema, sobject_options, *default_args)
197199
kwargs["parent"] = parent_class(**parent_kwargs)
198200

199201
stream = stream_class(**kwargs)
200202

201-
api_type = cls._get_api_type(stream_name, json_schema, config.get("force_use_bulk_api", False))
203+
api_type = self._get_api_type(stream_name, json_schema, config.get("force_use_bulk_api", False))
202204
if api_type == "rest" and not stream.primary_key and stream.too_many_properties:
203205
logger.warning(
204206
f"Can not instantiate stream {stream_name}. It is not supported by the BULK API and can not be "
205207
"implemented via REST because the number of its properties exceeds the limit and it lacks a primary key."
206208
)
207209
continue
208-
streams.append(stream)
210+
211+
streams.append(self._wrap_for_concurrency(config, stream, state_manager))
212+
streams.append(self._wrap_for_concurrency(config, Describe(sf_api=sf_object, catalog=self.catalog), state_manager))
209213
return streams
210214

215+
def _wrap_for_concurrency(self, config, stream, state_manager):
216+
stream_slicer_cursor = None
217+
if stream.cursor_field:
218+
stream_slicer_cursor = self._create_stream_slicer_cursor(config, state_manager, stream)
219+
if hasattr(stream, "set_cursor"):
220+
stream.set_cursor(stream_slicer_cursor)
221+
if hasattr(stream, "parent") and hasattr(stream.parent, "set_cursor"):
222+
stream_slicer_cursor = self._create_stream_slicer_cursor(config, state_manager, stream)
223+
stream.parent.set_cursor(stream_slicer_cursor)
224+
225+
if not stream_slicer_cursor or self._get_sync_mode_from_catalog(stream) == SyncMode.full_refresh:
226+
cursor = FinalStateCursor(
227+
stream_name=stream.name, stream_namespace=stream.namespace, message_repository=self.message_repository
228+
)
229+
state = None
230+
else:
231+
cursor = stream_slicer_cursor
232+
state = cursor.state
233+
return StreamFacade.create_from_stream(stream, self, logger, state, cursor)
234+
211235
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
212236
if not config.get("start_date"):
213237
config["start_date"] = (datetime.now() - relativedelta(years=self.START_DATE_OFFSET_IN_YEARS)).strftime(self.DATETIME_FORMAT)
214238
sf = self._get_sf_object(config)
215239
stream_objects = sf.get_validated_streams(config=config, catalog=self.catalog)
216240
streams = self.generate_streams(config, stream_objects, sf)
217-
streams.append(Describe(sf_api=sf, catalog=self.catalog))
218-
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self.state)
219-
220-
configured_streams = []
221-
222-
for stream in streams:
223-
sync_mode = self._get_sync_mode_from_catalog(stream)
224-
if sync_mode == SyncMode.full_refresh:
225-
cursor = FinalStateCursor(
226-
stream_name=stream.name, stream_namespace=stream.namespace, message_repository=self.message_repository
227-
)
228-
state = None
229-
else:
230-
cursor_field_key = stream.cursor_field or ""
231-
if not isinstance(cursor_field_key, str):
232-
raise AssertionError(f"A string cursor field key is required, but got {cursor_field_key}.")
233-
cursor_field = CursorField(cursor_field_key)
234-
legacy_state = state_manager.get_stream_state(stream.name, stream.namespace)
235-
cursor = ConcurrentCursor(
236-
stream.name,
237-
stream.namespace,
238-
legacy_state,
239-
self.message_repository,
240-
state_manager,
241-
stream.state_converter,
242-
cursor_field,
243-
self._get_slice_boundary_fields(stream, state_manager),
244-
config["start_date"],
245-
)
246-
state = cursor.state
241+
return streams
247242

248-
configured_streams.append(StreamFacade.create_from_stream(stream, self, logger, state, cursor))
249-
return configured_streams
243+
def _create_stream_slicer_cursor(
244+
self, config: Mapping[str, Any], state_manager: ConnectorStateManager, stream: Stream
245+
) -> ConcurrentCursor:
246+
"""
247+
We have moved the generation of stream slices to the concurrent CDK cursor
248+
"""
249+
cursor_field_key = stream.cursor_field or ""
250+
if not isinstance(cursor_field_key, str):
251+
raise AssertionError(f"Nested cursor field are not supported hence type str is expected but got {cursor_field_key}.")
252+
cursor_field = CursorField(cursor_field_key)
253+
stream_state = state_manager.get_stream_state(stream.name, stream.namespace)
254+
return ConcurrentCursor(
255+
stream.name,
256+
stream.namespace,
257+
stream_state,
258+
self.message_repository,
259+
state_manager,
260+
stream.state_converter,
261+
cursor_field,
262+
self._get_slice_boundary_fields(stream, state_manager),
263+
datetime.fromtimestamp(pendulum.parse(config["start_date"]).timestamp(), timezone.utc),
264+
stream.state_converter.get_end_provider(),
265+
timedelta(seconds=LOOKBACK_SECONDS),
266+
isodate.parse_duration(config["stream_slice_step"]) if "stream_slice_step" in config else timedelta(days=30),
267+
)
250268

251269
def _get_slice_boundary_fields(self, stream: Stream, state_manager: ConnectorStateManager) -> Optional[Tuple[str, str]]:
252270
return ("start_date", "end_date")

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+21-18
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@
1111
import uuid
1212
from abc import ABC
1313
from contextlib import closing
14-
from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
14+
from datetime import timedelta
15+
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
1516

1617
import backoff
1718
import pandas as pd
1819
import pendulum
1920
import requests # type: ignore[import]
2021
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode
2122
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
23+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
2224
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import IsoMillisConcurrentStreamStateConverter
2325
from airbyte_cdk.sources.streams.core import Stream, StreamData
2426
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
@@ -44,7 +46,7 @@
4446

4547

4648
class SalesforceStream(HttpStream, ABC):
47-
state_converter = IsoMillisConcurrentStreamStateConverter()
49+
state_converter = IsoMillisConcurrentStreamStateConverter(is_sequential_state=False)
4850
page_size = 2000
4951
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
5052
encoding = DEFAULT_ENCODING
@@ -142,7 +144,7 @@ def __init__(self, properties: Mapping[str, Any]):
142144

143145

144146
class RestSalesforceStream(SalesforceStream):
145-
state_converter = IsoMillisConcurrentStreamStateConverter()
147+
state_converter = IsoMillisConcurrentStreamStateConverter(is_sequential_state=False)
146148

147149
def __init__(self, *args, **kwargs):
148150
super().__init__(*args, **kwargs)
@@ -320,7 +322,7 @@ def _fetch_next_page_for_chunk(
320322

321323

322324
class BatchedSubStream(HttpSubStream):
323-
state_converter = IsoMillisConcurrentStreamStateConverter()
325+
state_converter = IsoMillisConcurrentStreamStateConverter(is_sequential_state=False)
324326
SLICE_BATCH_SIZE = 200
325327

326328
def stream_slices(
@@ -705,7 +707,10 @@ def get_standard_instance(self) -> SalesforceStream:
705707
stream_kwargs.update({"replication_key": self.replication_key, "start_date": self.start_date})
706708
new_cls = IncrementalRestSalesforceStream
707709

708-
return new_cls(**stream_kwargs)
710+
standard_instance = new_cls(**stream_kwargs)
711+
if hasattr(standard_instance, "set_cursor"):
712+
standard_instance.set_cursor(self._stream_slicer_cursor)
713+
return standard_instance
709714

710715

711716
class BulkSalesforceSubStream(BatchedSubStream, BulkSalesforceStream):
@@ -732,24 +737,22 @@ def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwar
732737
super().__init__(**kwargs)
733738
self.replication_key = replication_key
734739
self._stream_slice_step = stream_slice_step
740+
self._stream_slicer_cursor = None
741+
742+
def set_cursor(self, cursor: Cursor) -> None:
743+
self._stream_slicer_cursor = cursor
735744

736745
def stream_slices(
737746
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
738747
) -> Iterable[Optional[Mapping[str, Any]]]:
739-
now = pendulum.now(tz="UTC")
740-
assert LOOKBACK_SECONDS is not None and LOOKBACK_SECONDS >= 0
741-
742-
initial_date = self.get_start_date_from_state(stream_state) - pendulum.Duration(seconds=LOOKBACK_SECONDS)
743-
slice_start = initial_date
744-
while slice_start < now:
745-
slice_end = slice_start + self.stream_slice_step
746-
self._slice = {
748+
if not self._stream_slicer_cursor:
749+
raise ValueError("Cursor should be set at this point")
750+
751+
for slice_start, slice_end in self._stream_slicer_cursor.generate_slices():
752+
yield {
747753
"start_date": slice_start.isoformat(timespec="milliseconds"),
748-
"end_date": min(slice_end, now).isoformat(timespec="milliseconds"),
754+
"end_date": slice_end.isoformat(timespec="milliseconds"),
749755
}
750-
yield self._slice
751-
752-
slice_start += self.stream_slice_step
753756

754757
@property
755758
def stream_slice_step(self) -> pendulum.Duration:
@@ -829,7 +832,7 @@ def request_params(
829832

830833

831834
class Describe(Stream):
832-
state_converter = IsoMillisConcurrentStreamStateConverter()
835+
state_converter = IsoMillisConcurrentStreamStateConverter(is_sequential_state=False)
833836
"""
834837
Stream of sObjects' (Salesforce Objects) describe:
835838
https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_sobject_describe.htm

0 commit comments

Comments
 (0)