Skip to content

Commit ceedfa3

Browse files
authored
[ONCALL #6352] stripe RFR pagination issue (#44862)
1 parent 89e890d commit ceedfa3

File tree

5 files changed

+144
-32
lines changed

5 files changed

+144
-32
lines changed

airbyte-integrations/connectors/source-stripe/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
13-
dockerImageTag: 5.5.1
13+
dockerImageTag: 5.5.2
1414
dockerRepository: airbyte/source-stripe
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
1616
erdUrl: https://dbdocs.io/airbyteio/source-stripe?view=relationships

airbyte-integrations/connectors/source-stripe/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "5.5.1"
6+
version = "5.5.2"
77
name = "source-stripe"
88
description = "Source implementation for Stripe."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-stripe/source_stripe/streams.py

+18
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
from airbyte_cdk import BackoffStrategy
1515
from airbyte_cdk.models import SyncMode
1616
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ExponentialBackoffStrategy
17+
from airbyte_cdk.sources.streams.checkpoint import Cursor
18+
from airbyte_cdk.sources.streams.checkpoint.resumable_full_refresh_cursor import ResumableFullRefreshCursor
19+
from airbyte_cdk.sources.streams.checkpoint.substream_resumable_full_refresh_cursor import SubstreamResumableFullRefreshCursor
1720
from airbyte_cdk.sources.streams.core import StreamData
1821
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
1922
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
@@ -208,6 +211,21 @@ def retry_factor(self) -> float:
208211
"""
209212
return 0 if IS_TESTING else super(StripeStream, self).retry_factor
210213

214+
def get_cursor(self) -> Optional[Cursor]:
215+
"""
216+
RFR is breaking the pagination in Stripe today. The stream is instantiated using the stream facade here. During the read, this goes
217+
through the concurrent code here so that we can read full refresh streams concurrently.
218+
219+
However, as there are no cursors and the read records is the one from the HttpStream, we end up assigning the
220+
ResumableFullRefresCursor and hence only read a single page.
221+
222+
In order to avoid that, we will assume there are no cursor if the cursor if RFR.
223+
"""
224+
parent_cursor = super().get_cursor()
225+
if isinstance(parent_cursor, (ResumableFullRefreshCursor, SubstreamResumableFullRefreshCursor)):
226+
return None
227+
return parent_cursor
228+
211229

212230
class IStreamSelector(ABC):
213231
@abstractmethod
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from datetime import datetime, timedelta, timezone
4+
from unittest import TestCase
5+
6+
import freezegun
7+
from airbyte_cdk.test.catalog_builder import CatalogBuilder
8+
from airbyte_cdk.test.entrypoint_wrapper import read
9+
from airbyte_cdk.test.mock_http import HttpMocker
10+
from airbyte_cdk.test.mock_http.response_builder import (
11+
FieldPath,
12+
HttpResponseBuilder,
13+
RecordBuilder,
14+
create_record_builder,
15+
create_response_builder,
16+
find_template,
17+
)
18+
from airbyte_cdk.test.state_builder import StateBuilder
19+
from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode
20+
from integration.config import ConfigBuilder
21+
from integration.pagination import StripePaginationStrategy
22+
from integration.request_builder import StripeRequestBuilder
23+
from source_stripe import SourceStripe
24+
25+
_STREAM_NAME = "accounts"
26+
_ACCOUNT_ID = "acct_1G9HZLIEn49ers"
27+
_CLIENT_SECRET = "ConfigBuilder default client secret"
28+
_NOW = datetime.now(timezone.utc)
29+
_CONFIG = {
30+
"client_secret": _CLIENT_SECRET,
31+
"account_id": _ACCOUNT_ID,
32+
}
33+
_NO_STATE = StateBuilder().build()
34+
_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1)
35+
36+
37+
def _create_config() -> ConfigBuilder:
38+
return ConfigBuilder().with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET)
39+
40+
41+
def _create_catalog(sync_mode: SyncMode = SyncMode.full_refresh) -> ConfiguredAirbyteCatalog:
42+
return CatalogBuilder().with_stream(name="accounts", sync_mode=sync_mode).build()
43+
44+
45+
def _create_accounts_request() -> StripeRequestBuilder:
46+
return StripeRequestBuilder.accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET)
47+
48+
49+
def _create_response() -> HttpResponseBuilder:
50+
return create_response_builder(
51+
response_template=find_template(_STREAM_NAME, __file__),
52+
records_path=FieldPath("data"),
53+
pagination_strategy=StripePaginationStrategy(),
54+
)
55+
56+
57+
def _create_record() -> RecordBuilder:
58+
return create_record_builder(
59+
find_template(_STREAM_NAME, __file__),
60+
FieldPath("data"),
61+
record_id_path=FieldPath("id"),
62+
)
63+
64+
65+
@freezegun.freeze_time(_NOW.isoformat())
66+
class AccountsTest(TestCase):
67+
@HttpMocker()
68+
def test_full_refresh(self, http_mocker: HttpMocker) -> None:
69+
http_mocker.get(
70+
_create_accounts_request().with_limit(100).build(),
71+
_create_response().with_record(record=_create_record()).build(),
72+
)
73+
74+
source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE)
75+
actual_messages = read(source, config=_CONFIG, catalog=_create_catalog())
76+
77+
assert len(actual_messages.records) == 1
78+
79+
@HttpMocker()
80+
def test_pagination(self, http_mocker: HttpMocker) -> None:
81+
http_mocker.get(
82+
_create_accounts_request().with_limit(100).build(),
83+
_create_response().with_record(record=_create_record().with_id("last_record_id_from_first_page")).with_pagination().build(),
84+
)
85+
http_mocker.get(
86+
_create_accounts_request().with_limit(100).with_starting_after("last_record_id_from_first_page").build(),
87+
_create_response().with_record(record=_create_record()).build(),
88+
)
89+
90+
source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE)
91+
actual_messages = read(source, config=_CONFIG, catalog=_create_catalog())
92+
93+
assert len(actual_messages.records) == 2

0 commit comments

Comments
 (0)