Skip to content

🐛Source-Stripe - Changed Invoice Line Items and Subscription Items incremental type #58612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1065,100 +1065,44 @@ definitions:
types[]: '{{["person.created", "person.updated", "person.deleted"]}}'

invoice_line_items:
type: StateDelegatingStream
$ref: "#/definitions/entity_stream"
$parameters:
name: invoice_line_items
full_refresh_stream:
type: DeclarativeStream
primary_key:
- id
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_stripe/schemas/{{ parameters['name'] }}.json"
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: invoices/{{stream_partition.invoice_id}}/lines
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- data
transform_before_filtering: true
schema_normalization: Default
paginator:
$ref: "#/definitions/base_paginator"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: invoice_id
extra_fields:
- ["id"]
- ["created"]
- ["updated"]
lazy_read_pointer: ["lines"]
stream:
$ref: "#/definitions/streams/invoices"
incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: invoice_updated
global_substream_cursor: true
transformations:
- type: AddFields
fields:
- path:
- invoice_id
value: "{{ stream_slice.extra_fields['id']}}"
value_type: integer
- type: AddFields
fields:
- path:
- invoice_created
value: "{{ stream_slice.extra_fields['created'] | int}}"
value_type: integer
- type: AddFields
fields:
- path:
- invoice_updated
value: "{{ stream_slice.extra_fields['updated'] | int}}"
value_type: integer
incremental_stream:
$ref: "#/definitions/events_based_stream"
incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: invoice_updated
retriever:
$ref: "#/definitions/events_objects_retriever"
$parameters:
request_parameters:
types[]: '{{["invoice.created", "invoice.deleted", "invoice.updated"]}}'
transformations:
- type: AddFields
fields:
- path:
- data
- object
- is_deleted
value: "{{ True }}"
condition: "{{ record.get('data', {}).get('object', False) and record.get('type', '').endswith('.deleted') }}"
- type: AddFields
fields:
- path:
- data
- object
- invoice_updated
value: "{{ record.get('updated', record.get('created', now_utc().timestamp())) | int }}"
value_type: integer
condition: "{{ record.get('data', {}).get('object', False) }}"
- type: DpathFlattenFields
field_path:
- data
- object
replace_record: true
retriever:
$ref: "#/definitions/base_retriever"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: invoice_id
stream:
$ref: "#/definitions/streams/invoices"
incremental_dependency: true
$parameters:
path: invoices/{{ stream_partition.invoice_id }}/lines
# This stream is not truly incremental on the child level.
# We're configuring it this way to support incremental syncs on the parent only,
# which helps reduce the size of the state.
incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: created
global_substream_cursor: true
transformations:
- type: AddFields
fields:
- path:
- invoice_id
value: "{{ stream_slice.extra_fields['id'] | int}}"
value_type: integer
- path:
- invoice_created
value: "{{ stream_slice.extra_fields['created'] | int}}"
value_type: integer
- path:
- invoice_updated
value: "{{ stream_slice.extra_fields['updated'] | int}}"
value_type: integer

application_fees_refunds:
type: StateDelegatingStream
Expand Down Expand Up @@ -1216,88 +1160,38 @@ definitions:
request_parameters: "{{ {'type': 'application_fee.refund.updated'} }}"

subscription_items:
type: StateDelegatingStream
$ref: "#/definitions/entity_stream"
$parameters:
name: subscription_items
full_refresh_stream:
type: DeclarativeStream
primary_key:
- id
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_stripe/schemas/{{ parameters['name'] }}.json"
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: subscription_items
request_parameters:
subscription: "{{stream_partition.subscription_id}}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- data
transform_before_filtering: true
schema_normalization: Default
paginator:
$ref: "#/definitions/base_paginator"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: subscription_id
lazy_read_pointer: ["items"]
extra_fields:
- ["updated"]
stream:
$ref: "#/definitions/streams/subscriptions"
incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: subscription_updated
global_substream_cursor: true
transformations:
- type: AddFields
fields:
- path:
- subscription_updated
value: "{{ stream_slice.extra_fields['updated'] | int}}"
value_type: integer
incremental_stream:
$ref: "#/definitions/events_based_stream"
incremental_sync:
$ref: "#/definitions/events_read_slice_cursor"
cursor_field: subscription_updated
retriever:
$ref: "#/definitions/events_objects_retriever"
$parameters:
request_parameters:
types[]: '{{["customer.subscription.created", "customer.subscription.updated"]}}'
transformations:
- type: AddFields
fields:
- path:
- data
- object
- is_deleted
value: "{{ True }}"
condition: "{{ record.get('data', {}).get('object', False) and record.get('type', '').endswith('.deleted') }}"
- type: AddFields
fields:
- path:
- data
- object
- subscription_updated
value: "{{ record.get('updated', record.get('created', now_utc().timestamp())) | int }}"
value_type: integer
condition: "{{ record.get('data', {}).get('object', False) }}"
- type: DpathFlattenFields
field_path:
- data
- object
replace_record: true
retriever:
$ref: "#/definitions/base_retriever"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: subscription_id
stream:
$ref: "#/definitions/streams/subscriptions"
incremental_dependency: true
$parameters:
path: subscription_items
request_parameters:
subscription: "{{stream_partition.subscription_id}}"
# This stream is not truly incremental on the child level.
# We're configuring it this way to support incremental syncs on the parent only,
# which helps reduce the size of the state.
incremental_sync:
$ref: "#/definitions/entity_single_slice_cursor"
cursor_field: created
global_substream_cursor: true
transformations:
- type: AddFields
fields:
- path:
- subscription_updated
value: "{{ stream_slice.extra_fields['updated'] | int}}"
value_type: integer

bank_accounts:
type: StateDelegatingStream
Expand Down
Loading