Skip to content

✨ Source Stripe: Enable concurrency on incremental syncs for balance_transactions, events, files, file_links and shipping_rates #34619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ acceptance_tests:
basic_read:
tests:
- config_path: "secrets/config.json"
fail_on_extra_columns: false # CATs are failing since https://github.com/airbytehq/airbyte/commit/dccb2fa7165f031fa1233d695897b07f9aacb39c, API Source team to fix this
timeout_seconds: 3600
empty_streams:
- name: "application_fees"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.2.1
dockerImageTag: 5.2.2
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk==0.59.1", "stripe==2.56.0", "pendulum==2.1.2"]
MAIN_REQUIREMENTS = ["airbyte-cdk==0.60.1", "stripe==2.56.0", "pendulum==2.1.2"]

# we set `requests-mock~=1.11.0` to ensure concurrency is supported
TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock~=1.11.0", "requests_mock~=1.8", "freezegun==1.2.2"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceStripe(
SourceStripe.read_catalog(catalog_path) if catalog_path else None,
SourceStripe.read_config(config_path) if config_path else None,
SourceStripe.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
import os
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
Expand All @@ -14,11 +14,14 @@
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.source import TState
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
from airbyte_cdk.sources.streams.concurrent.cursor import Comparable, ConcurrentCursor, CursorField, NoopCursor
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_protocol.models import SyncMode
Expand Down Expand Up @@ -49,8 +52,11 @@
class SourceStripe(ConcurrentSourceAdapter):

message_repository = InMemoryMessageRepository(entrypoint_logger.level)
_SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION = {
Events: ("created[gte]", "created[lte]"),
}

def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], **kwargs):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
if config:
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY)
else:
Expand All @@ -60,6 +66,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
concurrency_level, concurrency_level // 2, logger, self._slice_logger, self.message_repository
)
super().__init__(concurrent_source)
self._state = state
if catalog:
self._streams_configured_as_full_refresh = {
configured_stream.stream.name
Expand All @@ -71,9 +78,8 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
self._streams_configured_as_full_refresh = set()

@staticmethod
def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
start_date, lookback_window_days, slice_range = (
config.get("start_date"),
def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
lookback_window_days, slice_range = (
config.get("lookback_window_days"),
config.get("slice_range"),
)
Expand All @@ -86,9 +92,9 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
internal_message=message,
failure_type=FailureType.config_error,
)
if start_date:
# verifies the start_date is parseable
SourceStripe._start_date_to_timestamp(start_date)

# verifies the start_date in the config is valid
SourceStripe._start_date_to_timestamp(config)
if slice_range is None:
config["slice_range"] = 365
elif not isinstance(slice_range, int) or slice_range < 1:
Expand All @@ -100,7 +106,7 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
)
return config

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]:
self.validate_and_fill_with_defaults(config)
stripe.api_key = config["client_secret"]
try:
Expand Down Expand Up @@ -167,14 +173,11 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:

return HttpAPIBudget(policies=policies)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])

if "start_date" in config:
start_timestamp = self._start_date_to_timestamp(config["start_date"])
else:
start_timestamp = pendulum.datetime(2017, 1, 25).int_timestamp
start_timestamp = self._start_date_to_timestamp(config)
args = {
"authenticator": authenticator,
"account_id": config["account_id"],
Expand Down Expand Up @@ -511,21 +514,44 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
),
]

return [
StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor())
if stream.name in self._streams_configured_as_full_refresh
else stream
for stream in streams
]
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state)
return [self._to_concurrent(stream, self._start_date_to_timestamp(config), state_manager) for stream in streams]

def _to_concurrent(self, stream: Stream, fallback_start, state_manager: ConnectorStateManager) -> Stream:
if stream.name in self._streams_configured_as_full_refresh:
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor())

state = state_manager.get_stream_state(stream.name, stream.namespace)
slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
if slice_boundary_fields:
cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0])
converter = EpochValueConcurrentStreamStateConverter()
cursor = ConcurrentCursor(
stream.name,
stream.namespace,
state_manager.get_stream_state(stream.name, stream.namespace),
self.message_repository,
state_manager,
converter,
cursor_field,
slice_boundary_fields,
fallback_start,
)
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, state, cursor)

return stream

def _create_empty_state(self) -> MutableMapping[str, Any]:
# The state is known to be empty because concurrent CDK is currently only used for full refresh
return {}

@staticmethod
def _start_date_to_timestamp(start_date: str) -> int:
def _start_date_to_timestamp(config: Mapping[str, Any]) -> int:
if "start_date" not in config:
return pendulum.datetime(2017, 1, 25).int_timestamp # type: ignore # pendulum not typed

start_date = config["start_date"]
try:
return pendulum.parse(start_date).int_timestamp
return pendulum.parse(start_date).int_timestamp # type: ignore # pendulum not typed
except pendulum.parsing.exceptions.ParserError as e:
message = f"Invalid start date {start_date}. Please use YYYY-MM-DDTHH:MM:SSZ format."
raise AirbyteTracedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytest
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.test.state_builder import StateBuilder

os.environ["CACHE_DISABLED"] = "true"
os.environ["DEPLOYMENT_MODE"] = "testing"
Expand Down Expand Up @@ -40,7 +41,7 @@ def stream_by_name(config):
from source_stripe.source import SourceStripe

def mocker(stream_name, source_config=config):
source = SourceStripe(None, source_config)
source = SourceStripe(None, source_config, StateBuilder().build())
streams = source.streams(source_config)
for stream in streams:
if stream.name == stream_name:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand All @@ -19,7 +19,7 @@
find_template,
)
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode
from airbyte_protocol.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, FailureType, SyncMode
from integration.config import ConfigBuilder
from integration.pagination import StripePaginationStrategy
from integration.request_builder import StripeRequestBuilder
Expand Down Expand Up @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -110,12 +110,12 @@ def _given_events_availability_check(http_mocker: HttpMocker) -> None:
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode,
state: Optional[Dict[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
Expand Down Expand Up @@ -372,5 +372,5 @@ def test_given_state_earlier_than_30_days_when_read_then_query_events_using_type
def _an_application_fee_event(self) -> RecordBuilder:
return _an_event().with_field(_DATA_FIELD, _an_application_fee().build())

def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput:
def _read(self, config: ConfigBuilder, state: Optional[List[AirbyteStateMessage]], expecting_exception: bool = False) -> EntrypointOutput:
return _read(config, SyncMode.incremental, state, expecting_exception)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -62,8 +63,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -143,7 +144,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def _assert_not_available(output: EntrypointOutput) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -115,7 +115,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -66,8 +67,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -146,7 +147,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def _assert_not_available(output: EntrypointOutput) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -115,7 +115,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
Expand Down
Loading