Skip to content

Commit 1a7cf20

Browse files
maxi297xiaohansong
authored andcommitted
✨ Source Stripe: Events stream concurrent on incremental syncs (#34619)
1 parent dd4cb0f commit 1a7cf20

23 files changed

+176
-117
lines changed

airbyte-integrations/connectors/source-stripe/acceptance-test-config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ acceptance_tests:
1818
basic_read:
1919
tests:
2020
- config_path: "secrets/config.json"
21+
fail_on_extra_columns: false # CATs are failing since https://github.com/airbytehq/airbyte/commit/dccb2fa7165f031fa1233d695897b07f9aacb39c, API Source team to fix this
2122
timeout_seconds: 3600
2223
empty_streams:
2324
- name: "application_fees"

airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl

Lines changed: 6 additions & 5 deletions
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-stripe/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
13-
dockerImageTag: 5.2.1
13+
dockerImageTag: 5.2.2
1414
dockerRepository: airbyte/source-stripe
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
1616
githubIssueLabel: source-stripe

airbyte-integrations/connectors/source-stripe/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from setuptools import find_packages, setup
77

8-
MAIN_REQUIREMENTS = ["airbyte-cdk==0.59.1", "stripe==2.56.0", "pendulum==2.1.2"]
8+
MAIN_REQUIREMENTS = ["airbyte-cdk==0.60.1", "stripe==2.56.0", "pendulum==2.1.2"]
99

1010
# we set `requests-mock~=1.11.0` to ensure concurrency is supported
1111
TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock~=1.11.0", "requests_mock~=1.8", "freezegun==1.2.2"]

airbyte-integrations/connectors/source-stripe/source_stripe/run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
def _get_source(args: List[str]):
1717
catalog_path = AirbyteEntrypoint.extract_catalog(args)
1818
config_path = AirbyteEntrypoint.extract_config(args)
19+
state_path = AirbyteEntrypoint.extract_state(args)
1920
try:
2021
return SourceStripe(
2122
SourceStripe.read_catalog(catalog_path) if catalog_path else None,
2223
SourceStripe.read_config(config_path) if config_path else None,
24+
SourceStripe.read_state(state_path) if state_path else None,
2325
)
2426
except Exception as error:
2527
print(

airbyte-integrations/connectors/source-stripe/source_stripe/source.py

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType
1515
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
1616
from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
17+
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1718
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
19+
from airbyte_cdk.sources.source import TState
1820
from airbyte_cdk.sources.streams import Stream
1921
from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate
2022
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
21-
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
23+
from airbyte_cdk.sources.streams.concurrent.cursor import Comparable, ConcurrentCursor, CursorField, NoopCursor
24+
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter
2225
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
2326
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2427
from airbyte_protocol.models import SyncMode
@@ -42,15 +45,23 @@
4245
_MAX_CONCURRENCY = 20
4346
_DEFAULT_CONCURRENCY = 10
4447
_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
48+
_REFUND_STREAM_NAME = "refunds"
49+
_INCREMENTAL_CONCURRENCY_EXCLUSION = {
50+
_REFUND_STREAM_NAME, # excluded because of the upcoming changes in terms of cursor https://github.com/airbytehq/airbyte/issues/34332
51+
}
4552
USE_CACHE = not _CACHE_DISABLED
4653
STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_"
4754

4855

4956
class SourceStripe(ConcurrentSourceAdapter):
5057

5158
message_repository = InMemoryMessageRepository(entrypoint_logger.level)
59+
_SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION = {
60+
Events: ("created[gte]", "created[lte]"),
61+
CreatedCursorIncrementalStripeStream: ("created[gte]", "created[lte]"),
62+
}
5263

53-
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], **kwargs):
64+
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
5465
if config:
5566
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY)
5667
else:
@@ -60,6 +71,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
6071
concurrency_level, concurrency_level // 2, logger, self._slice_logger, self.message_repository
6172
)
6273
super().__init__(concurrent_source)
74+
self._state = state
6375
if catalog:
6476
self._streams_configured_as_full_refresh = {
6577
configured_stream.stream.name
@@ -71,9 +83,8 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
7183
self._streams_configured_as_full_refresh = set()
7284

7385
@staticmethod
74-
def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
75-
start_date, lookback_window_days, slice_range = (
76-
config.get("start_date"),
86+
def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
87+
lookback_window_days, slice_range = (
7788
config.get("lookback_window_days"),
7889
config.get("slice_range"),
7990
)
@@ -86,9 +97,9 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
8697
internal_message=message,
8798
failure_type=FailureType.config_error,
8899
)
89-
if start_date:
90-
# verifies the start_date is parseable
91-
SourceStripe._start_date_to_timestamp(start_date)
100+
101+
# verifies the start_date in the config is valid
102+
SourceStripe._start_date_to_timestamp(config)
92103
if slice_range is None:
93104
config["slice_range"] = 365
94105
elif not isinstance(slice_range, int) or slice_range < 1:
@@ -100,7 +111,7 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
100111
)
101112
return config
102113

103-
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
114+
def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]:
104115
self.validate_and_fill_with_defaults(config)
105116
stripe.api_key = config["client_secret"]
106117
try:
@@ -167,14 +178,11 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:
167178

168179
return HttpAPIBudget(policies=policies)
169180

170-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
181+
def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
171182
config = self.validate_and_fill_with_defaults(config)
172183
authenticator = TokenAuthenticator(config["client_secret"])
173184

174-
if "start_date" in config:
175-
start_timestamp = self._start_date_to_timestamp(config["start_date"])
176-
else:
177-
start_timestamp = pendulum.datetime(2017, 1, 25).int_timestamp
185+
start_timestamp = self._start_date_to_timestamp(config)
178186
args = {
179187
"authenticator": authenticator,
180188
"account_id": config["account_id"],
@@ -289,7 +297,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
289297
# The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs.
290298
# Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe.
291299
# See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428
292-
CreatedCursorIncrementalStripeStream(name="refunds", path="refunds", **incremental_args),
300+
CreatedCursorIncrementalStripeStream(name=_REFUND_STREAM_NAME, path="refunds", **incremental_args),
293301
UpdatedCursorIncrementalStripeStream(
294302
name="payment_methods",
295303
path="payment_methods",
@@ -511,21 +519,44 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
511519
),
512520
]
513521

514-
return [
515-
StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor())
516-
if stream.name in self._streams_configured_as_full_refresh
517-
else stream
518-
for stream in streams
519-
]
522+
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state)
523+
return [self._to_concurrent(stream, self._start_date_to_timestamp(config), state_manager) for stream in streams]
524+
525+
def _to_concurrent(self, stream: Stream, fallback_start, state_manager: ConnectorStateManager) -> Stream:
526+
if stream.name in self._streams_configured_as_full_refresh:
527+
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor())
528+
529+
state = state_manager.get_stream_state(stream.name, stream.namespace)
530+
slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
531+
if slice_boundary_fields and stream.name not in _INCREMENTAL_CONCURRENCY_EXCLUSION:
532+
cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0])
533+
converter = EpochValueConcurrentStreamStateConverter()
534+
cursor = ConcurrentCursor(
535+
stream.name,
536+
stream.namespace,
537+
state_manager.get_stream_state(stream.name, stream.namespace),
538+
self.message_repository,
539+
state_manager,
540+
converter,
541+
cursor_field,
542+
slice_boundary_fields,
543+
fallback_start,
544+
)
545+
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, state, cursor)
546+
547+
return stream
520548

521549
def _create_empty_state(self) -> MutableMapping[str, Any]:
522-
# The state is known to be empty because concurrent CDK is currently only used for full refresh
523550
return {}
524551

525552
@staticmethod
526-
def _start_date_to_timestamp(start_date: str) -> int:
553+
def _start_date_to_timestamp(config: Mapping[str, Any]) -> int:
554+
if "start_date" not in config:
555+
return pendulum.datetime(2017, 1, 25).int_timestamp # type: ignore # pendulum not typed
556+
557+
start_date = config["start_date"]
527558
try:
528-
return pendulum.parse(start_date).int_timestamp
559+
return pendulum.parse(start_date).int_timestamp # type: ignore # pendulum not typed
529560
except pendulum.parsing.exceptions.ParserError as e:
530561
message = f"Invalid start date {start_date}. Please use YYYY-MM-DDTHH:MM:SSZ format."
531562
raise AirbyteTracedException(

airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import os
66

77
import pytest
8+
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
89
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
10+
from airbyte_cdk.test.state_builder import StateBuilder
911

1012
os.environ["CACHE_DISABLED"] = "true"
1113
os.environ["DEPLOYMENT_MODE"] = "testing"
@@ -40,10 +42,14 @@ def stream_by_name(config):
4042
from source_stripe.source import SourceStripe
4143

4244
def mocker(stream_name, source_config=config):
43-
source = SourceStripe(None, source_config)
45+
source = SourceStripe(None, source_config, StateBuilder().build())
4446
streams = source.streams(source_config)
4547
for stream in streams:
4648
if stream.name == stream_name:
49+
if isinstance(stream, StreamFacade):
50+
# to avoid breaking changes for tests, we will return the legacy test. Tests that would be affected by not having this
51+
# would probably need to be moved to integration tests or unit tests
52+
return stream._legacy_stream
4753
return stream
4854

4955
return mocker

airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

3-
import json
43
from datetime import datetime, timedelta, timezone
5-
from typing import Any, Dict, Optional
4+
from typing import Any, Dict, List, Optional
65
from unittest import TestCase
76

87
import freezegun
8+
from airbyte_cdk.sources.source import TState
99
from airbyte_cdk.test.catalog_builder import CatalogBuilder
1010
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
1111
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
@@ -19,7 +19,7 @@
1919
find_template,
2020
)
2121
from airbyte_cdk.test.state_builder import StateBuilder
22-
from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode
22+
from airbyte_protocol.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, FailureType, SyncMode
2323
from integration.config import ConfigBuilder
2424
from integration.pagination import StripePaginationStrategy
2525
from integration.request_builder import StripeRequestBuilder
@@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
5555
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()
5656

5757

58-
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
59-
return SourceStripe(catalog, config)
58+
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]]) -> SourceStripe:
59+
return SourceStripe(catalog, config, state)
6060

6161

6262
def _an_event() -> RecordBuilder:
@@ -110,12 +110,12 @@ def _given_events_availability_check(http_mocker: HttpMocker) -> None:
110110
def _read(
111111
config_builder: ConfigBuilder,
112112
sync_mode: SyncMode,
113-
state: Optional[Dict[str, Any]] = None,
113+
state: Optional[List[AirbyteStateMessage]] = None,
114114
expecting_exception: bool = False
115115
) -> EntrypointOutput:
116116
catalog = _catalog(sync_mode)
117117
config = config_builder.build()
118-
return read(_source(catalog, config), config, catalog, state, expecting_exception)
118+
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)
119119

120120

121121
@freezegun.freeze_time(_NOW.isoformat())
@@ -372,5 +372,5 @@ def test_given_state_earlier_than_30_days_when_read_then_query_events_using_type
372372
def _an_application_fee_event(self) -> RecordBuilder:
373373
return _an_event().with_field(_DATA_FIELD, _an_application_fee().build())
374374

375-
def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput:
375+
def _read(self, config: ConfigBuilder, state: Optional[List[AirbyteStateMessage]], expecting_exception: bool = False) -> EntrypointOutput:
376376
return _read(config, SyncMode.incremental, state, expecting_exception)

airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from unittest import TestCase
88

99
import freezegun
10+
from airbyte_cdk.sources.source import TState
1011
from airbyte_cdk.test.catalog_builder import CatalogBuilder
1112
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
1213
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
@@ -62,8 +63,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
6263
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()
6364

6465

65-
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
66-
return SourceStripe(catalog, config)
66+
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
67+
return SourceStripe(catalog, config, state)
6768

6869

6970
def _an_event() -> RecordBuilder:
@@ -143,7 +144,7 @@ def _read(
143144
) -> EntrypointOutput:
144145
catalog = _catalog(sync_mode)
145146
config = config_builder.build()
146-
return read(_source(catalog, config), config, catalog, state, expecting_exception)
147+
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)
147148

148149

149150
def _assert_not_available(output: EntrypointOutput) -> None:

airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

3-
import json
43
from datetime import datetime, timedelta, timezone
54
from typing import Any, Dict, Optional
65
from unittest import TestCase
76

87
import freezegun
8+
from airbyte_cdk.sources.source import TState
99
from airbyte_cdk.test.catalog_builder import CatalogBuilder
1010
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
1111
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
@@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
5555
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()
5656

5757

58-
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
59-
return SourceStripe(catalog, config)
58+
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
59+
return SourceStripe(catalog, config, state)
6060

6161

6262
def _an_event() -> RecordBuilder:
@@ -115,7 +115,7 @@ def _read(
115115
) -> EntrypointOutput:
116116
catalog = _catalog(sync_mode)
117117
config = config_builder.build()
118-
return read(_source(catalog, config), config, catalog, state, expecting_exception)
118+
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)
119119

120120

121121
@freezegun.freeze_time(_NOW.isoformat())

0 commit comments

Comments
 (0)