diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml index dad9899af3327..9faf63d49cf56 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/manifest.yaml @@ -1065,100 +1065,44 @@ definitions: types[]: '{{["person.created", "person.updated", "person.deleted"]}}' invoice_line_items: - type: StateDelegatingStream + $ref: "#/definitions/entity_stream" $parameters: name: invoice_line_items - full_refresh_stream: - type: DeclarativeStream - primary_key: - - id - schema_loader: - type: JsonFileSchemaLoader - file_path: "./source_stripe/schemas/{{ parameters['name'] }}.json" - retriever: - type: SimpleRetriever - requester: - $ref: "#/definitions/base_requester" - path: invoices/{{stream_partition.invoice_id}}/lines - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: - - data - transform_before_filtering: true - schema_normalization: Default - paginator: - $ref: "#/definitions/base_paginator" - partition_router: - type: SubstreamPartitionRouter - parent_stream_configs: - - type: ParentStreamConfig - parent_key: id - partition_field: invoice_id - extra_fields: - - ["id"] - - ["created"] - - ["updated"] - lazy_read_pointer: ["lines"] - stream: - $ref: "#/definitions/streams/invoices" - incremental_sync: - $ref: "#/definitions/entity_single_slice_cursor" - cursor_field: invoice_updated - global_substream_cursor: true - transformations: - - type: AddFields - fields: - - path: - - invoice_id - value: "{{ stream_slice.extra_fields['id']}}" - value_type: integer - - type: AddFields - fields: - - path: - - invoice_created - value: "{{ stream_slice.extra_fields['created'] | int}}" - value_type: integer - - type: AddFields - fields: - - path: - - invoice_updated - value: "{{ stream_slice.extra_fields['updated'] | int}}" - value_type: integer - incremental_stream: - $ref: "#/definitions/events_based_stream" - incremental_sync: - $ref: "#/definitions/entity_single_slice_cursor" - cursor_field: invoice_updated - retriever: - $ref: "#/definitions/events_objects_retriever" - $parameters: - request_parameters: - types[]: '{{["invoice.created", "invoice.deleted", "invoice.updated"]}}' - transformations: - - type: AddFields - fields: - - path: - - data - - object - - is_deleted - value: "{{ True }}" - condition: "{{ record.get('data', {}).get('object', False) and record.get('type', '').endswith('.deleted') }}" - - type: AddFields - fields: - - path: - - data - - object - - invoice_updated - value: "{{ record.get('updated', record.get('created', now_utc().timestamp())) | int }}" - value_type: integer - condition: "{{ record.get('data', {}).get('object', False) }}" - - type: DpathFlattenFields - field_path: - - data - - object - replace_record: true + retriever: + $ref: "#/definitions/base_retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - type: ParentStreamConfig + parent_key: id + partition_field: invoice_id + stream: + $ref: "#/definitions/streams/invoices" + incremental_dependency: true + $parameters: + path: invoices/{{ stream_partition.invoice_id }}/lines + # This stream is not truly incremental on the child level. + # We're configuring it this way to support incremental syncs on the parent only, + # which helps reduce the size of the state. + incremental_sync: + $ref: "#/definitions/entity_single_slice_cursor" + cursor_field: created + global_substream_cursor: true + transformations: + - type: AddFields + fields: + - path: + - invoice_id + value: "{{ stream_slice.extra_fields['id'] | int}}" + value_type: integer + - path: + - invoice_created + value: "{{ stream_slice.extra_fields['created'] | int}}" + value_type: integer + - path: + - invoice_updated + value: "{{ stream_slice.extra_fields['updated'] | int}}" + value_type: integer application_fees_refunds: type: StateDelegatingStream @@ -1216,88 +1160,38 @@ definitions: request_parameters: "{{ {'type': 'application_fee.refund.updated'} }}" subscription_items: - type: StateDelegatingStream + $ref: "#/definitions/entity_stream" $parameters: name: subscription_items - full_refresh_stream: - type: DeclarativeStream - primary_key: - - id - schema_loader: - type: JsonFileSchemaLoader - file_path: "./source_stripe/schemas/{{ parameters['name'] }}.json" - retriever: - type: SimpleRetriever - requester: - $ref: "#/definitions/base_requester" - path: subscription_items - request_parameters: - subscription: "{{stream_partition.subscription_id}}" - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: - - data - transform_before_filtering: true - schema_normalization: Default - paginator: - $ref: "#/definitions/base_paginator" - partition_router: - type: SubstreamPartitionRouter - parent_stream_configs: - - type: ParentStreamConfig - parent_key: id - partition_field: subscription_id - lazy_read_pointer: ["items"] - extra_fields: - - ["updated"] - stream: - $ref: "#/definitions/streams/subscriptions" - incremental_sync: - $ref: "#/definitions/entity_single_slice_cursor" - cursor_field: subscription_updated - global_substream_cursor: true - transformations: - - type: AddFields - fields: - - path: - - subscription_updated - value: "{{ stream_slice.extra_fields['updated'] | int}}" - value_type: integer - incremental_stream: - $ref: "#/definitions/events_based_stream" - incremental_sync: - $ref: "#/definitions/events_read_slice_cursor" - cursor_field: subscription_updated - retriever: - $ref: "#/definitions/events_objects_retriever" - $parameters: - request_parameters: - types[]: '{{["customer.subscription.created", "customer.subscription.updated"]}}' - transformations: - - type: AddFields - fields: - - path: - - data - - object - - is_deleted - value: "{{ True }}" - condition: "{{ record.get('data', {}).get('object', False) and record.get('type', '').endswith('.deleted') }}" - - type: AddFields - fields: - - path: - - data - - object - - subscription_updated - value: "{{ record.get('updated', record.get('created', now_utc().timestamp())) | int }}" - value_type: integer - condition: "{{ record.get('data', {}).get('object', False) }}" - - type: DpathFlattenFields - field_path: - - data - - object - replace_record: true + retriever: + $ref: "#/definitions/base_retriever" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - type: ParentStreamConfig + parent_key: id + partition_field: subscription_id + stream: + $ref: "#/definitions/streams/subscriptions" + incremental_dependency: true + $parameters: + path: subscription_items + request_parameters: + subscription: "{{stream_partition.subscription_id}}" + # This stream is not truly incremental on the child level. + # We're configuring it this way to support incremental syncs on the parent only, + # which helps reduce the size of the state. + incremental_sync: + $ref: "#/definitions/entity_single_slice_cursor" + cursor_field: created + global_substream_cursor: true + transformations: + - type: AddFields + fields: + - path: + - subscription_updated + value: "{{ stream_slice.extra_fields['updated'] | int}}" + value_type: integer bank_accounts: type: StateDelegatingStream