Skip to content

[source-stripe] taxes #59179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.8.6
dockerImageTag: 5.8.7
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
erdUrl: https://dbdocs.io/airbyteio/source-stripe?view=relationships
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.8.6"
version = "5.8.7"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
"description": "The date and time when the transaction was created",
"type": ["null", "integer"]
},
"updated": {
"description": "The date and time when the transaction was created",
"type": ["null", "integer"]
},
"credit_note": {
"description": "Credit note related to the balance transaction",
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from source_stripe.streams import (
CreatedCursorIncrementalStripeStream,
CustomerBalanceTransactions,
Events,
IncrementalStripeStream,
ParentIncrementalStripeSubStream,
Expand Down Expand Up @@ -206,7 +207,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
],
**args,
)
subscription_items = UpdatedCursorIncrementalStripeLazySubStream(
subscription_items = ParentIncrementalStripeSubStream(
name="subscription_items",
path="subscription_items",
parent=subscriptions,
Expand All @@ -217,8 +218,6 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
},
cursor_field="subscription_updated",
use_cache=USE_CACHE,
sub_items_attr="items",
event_types=["customer.subscription.created", "customer.subscription.updated"],
**args,
)
transfers = IncrementalStripeStream(
Expand Down Expand Up @@ -505,13 +504,6 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
event_types=["topup.canceled", "topup.created", "topup.failed", "topup.reversed", "topup.succeeded"],
**args,
),
ParentIncrementalStripeSubStream(
name="customer_balance_transactions",
path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice['parent']['id']}/balance_transactions",
parent=self.customers(**args),
cursor_field="created",
**args,
),
UpdatedCursorIncrementalStripeLazySubStream(
name="application_fees_refunds",
path=lambda self, stream_slice, *args, **kwargs: f"application_fees/{stream_slice['parent']['id']}/refunds",
Expand Down Expand Up @@ -552,20 +544,11 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
},
**args,
),
UpdatedCursorIncrementalStripeLazySubStream(
ParentIncrementalStripeSubStream(
name="invoice_line_items",
path=lambda self, stream_slice, *args, **kwargs: f"invoices/{stream_slice['parent']['id']}/lines",
parent=invoices,
cursor_field="invoice_updated",
event_types=[
"invoice.created",
"invoice.deleted",
"invoice.updated",
# the event type = "invoice.upcoming" doesn't contain the `primary_key = `id` field,
# thus isn't used, see the doc: https://docs.stripe.com/api/invoices/object#invoice_object-id
# reference issue: https://github.com/airbytehq/oncall/issues/5560
],
sub_items_attr="lines",
slice_data_retriever=lambda record, stream_slice: {
"invoice_id": stream_slice["parent"]["id"],
"invoice_created": stream_slice["parent"]["created"],
Expand All @@ -582,6 +565,13 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
cursor_field="created",
**args,
),
CustomerBalanceTransactions(
name="customer_balance_transactions",
path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice['parent']['id']}/balance_transactions",
parents=[invoices, self.customers(**args)],
cursor_field="created",
**args,
),
StripeSubStream(
name="usage_records",
path=lambda self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
#

import copy
import hashlib
import math
import os
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from itertools import chain
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import pendulum
import requests

from airbyte_cdk import BackoffStrategy, StreamSlice
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import SyncMode, AirbyteMessage, AirbyteStateBlob, AirbyteStreamState, AirbyteStateType, AirbyteStateMessage, StreamDescriptor, Type as MessageType
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ExponentialBackoffStrategy
from airbyte_cdk.sources.streams.checkpoint import Cursor
from airbyte_cdk.sources.streams.checkpoint.resumable_full_refresh_cursor import ResumableFullRefreshCursor
Expand All @@ -25,7 +27,6 @@
from source_stripe.error_handlers import ParentIncrementalStripeSubStreamErrorHandler, StripeErrorHandler
from source_stripe.error_mappings import PARENT_INCREMENTAL_STRIPE_SUB_STREAM_ERROR_MAPPING


STRIPE_API_VERSION = "2022-11-15"
CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
IS_TESTING = os.environ.get("DEPLOYMENT_MODE") == "testing"
Expand Down Expand Up @@ -702,6 +703,12 @@ def __init__(
legacy_cursor_field=legacy_cursor_field,
event_types=event_types,
response_filter=response_filter,
record_extractor=UpdatedCursorIncrementalRecordExtractor(
cursor_field=cursor_field,
legacy_cursor_field=legacy_cursor_field,
response_filter=response_filter,
slice_data_retriever=kwargs.get("slice_data_retriever"),
),
**kwargs,
)
self.lazy_substream = StripeLazySubStream(
Expand Down Expand Up @@ -808,3 +815,106 @@ def stream_slices(
)
else:
yield from UpdatedCursorIncrementalStripeStream.stream_slices(self, sync_mode, cursor_field, stream_state)

class CustomerBalanceTransactions(ParentIncrementalStripeSubStream):
"""
Custom connector that incrementally collects the id from customers and customer from invoices.
It collects these customer IDs and makes a call to retrieve the customer balance transactions.
It implements a 2 day window to catch transactions created during previous run or right before
the invoice/customer event. To move the cursor along it will also cap to 7 days ago after initial
sync. API docs: https://stripe.com/docs/api/customer_balance_transactions/list
"""
def __init__(self, cursor_field: str, parents: List[StripeSubStream], *args, **kwargs):
super().__init__(cursor_field=cursor_field, parent=parents[0], *args, **kwargs)
self.parent_streams = parents

@property
def state_checkpoint_interval(self) -> int:
return 1 # force state write

def stream_slices(self, sync_mode: SyncMode, cursor_field=None, stream_state=None):
if stream_state:
normalized_state = self.normalize_state(stream_state)
else:
stream_state = {}
normalized_state = stream_state

seen = set()
any_records = False

for parent in self.parent_streams:
self.logger.info(f"Starting parent stream {parent.name} with state {normalized_state}")
slices = parent.stream_slices(sync_mode=sync_mode, cursor_field=parent.cursor_field, stream_state=normalized_state)

for stream_slice in slices:
records = parent.read_records(sync_mode=sync_mode, cursor_field=parent.cursor_field, stream_slice=stream_slice, stream_state=normalized_state)
for r in records:
parent_id = r.get("customer") or r.get("id")
balance = r.get("balance") or r.get("total", 0)
if parent_id and parent_id not in seen and balance != 0:
seen.add(parent_id)
any_records = True
yield {"parent": {"id": parent_id}}

if not any_records:
yield {"parent": {"id": "empty_slice"}}

def read_records(self, sync_mode, cursor_field=None, stream_slice=None, stream_state=None):
state = self.normalize_state(stream_state)
lookback = state["cursor"] - int(timedelta(days=2).total_seconds())

for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
if record.get("created", 0) > lookback:
yield record

def read(self, *args, **kwargs):
read_count = 0

for record in super().read(*args, **kwargs):
read_count += 1
yield record

if read_count == 1:
now = int(datetime.utcnow().timestamp())
synthetic = {self.cursor_field: now}
state = self.normalize_state(kwargs.get("stream_state"))
new_state = self.get_updated_state(state, synthetic)
self.logger.info(f"Persisting synthetic state: {new_state}")
yield AirbyteMessage(
type=MessageType.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=self.name),
stream_state=AirbyteStateBlob(data=new_state)
)
)
)

def normalize_state(self, state: Optional[Mapping[str, Any]]) -> dict:
"""Unifies legacy and new state format."""
state = (state or {}).get("data", state or {})
normalized = {
"cursor": state.get(f"{self.name}_cursor_created") or state.get("created") or state.get("cursor") or self.start_date,
"parents": {
p.name: state.get(f"{p.name}_cursor_updated") or state.get("updated") or state.get("created") or state.get("parents", {}).get(p.name) or self.start_date
for p in self.parent_streams
}
}
return normalized

def get_updated_state(self, current_state: Mapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:

previous_cursor_minus_2 = (current_state or {}).get("cursor", self.start_date) - int(timedelta(days=2).total_seconds())
latest_cursor_minus_2 = latest_record.get(self.cursor_field) - int(timedelta(days=2).total_seconds())
today_minus_7 = int(datetime.utcnow().timestamp()) - int(timedelta(days=7).total_seconds())
new_cursor = max(latest_cursor_minus_2, today_minus_7, previous_cursor_minus_2)
updated_parents = {
p.name: new_cursor
for p in self.parent_streams
}

return {
"cursor": new_cursor,
"parents": updated_parents
}
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ def test_subscription_items_extra_request_params(requests_mock, stream_by_name,
"id": "sub_1OApco2eZvKYlo2CEDCzwLrE",
"object": "subscription",
"created": 1699603174,
"updated": 1699603174,
"items": {
"object": "list",
"data": [
Expand Down Expand Up @@ -786,6 +787,7 @@ def test_subscription_items_extra_request_params(requests_mock, stream_by_name,
"id": "si_OynPdzMZykmCWm",
"object": "subscription_item",
"created": 1699603884,
#"updated": 1699603174,
"quantity": 2,
"subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE",
}
Expand All @@ -796,14 +798,6 @@ def test_subscription_items_extra_request_params(requests_mock, stream_by_name,
stream = stream_by_name("subscription_items", config)
records = read_from_stream(stream, "full_refresh", {})
assert records == [
{
"id": "si_OynDmET1kQPTbI",
"object": "subscription_item",
"created": 1699603175,
"quantity": 1,
"subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE",
"subscription_updated": 1699603174, # 1699603175
},
{
"id": "si_OynPdzMZykmCWm",
"object": "subscription_item",
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 5.8.7 | 2024-12-20 | [49976](https://github.com/airbytehq/airbyte/pull/49976) | Source-Stripe: Refactor Customer Balance Transactions (Remove updated from schema) |
| 5.8.6 | 2025-02-15 | [54067](https://github.com/airbytehq/airbyte/pull/54067) | Update dependencies |
| 5.8.5 | 2025-02-08 | [52018](https://github.com/airbytehq/airbyte/pull/52018) | Update dependencies |
| 5.8.4 | 2025-02-03 | [49940](https://github.com/airbytehq/airbyte/pull/49940) | Update CDK version |
Expand Down
Loading