Skip to content

Commit 9508f61

Browse files
🎉 Source Shopify: implement BalanceTransactions stream (#10204)
1 parent 103dd2a commit 9508f61

File tree

11 files changed

+143
-18
lines changed

11 files changed

+143
-18
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@
704704
- name: Shopify
705705
sourceDefinitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
706706
dockerRepository: airbyte/source-shopify
707-
dockerImageTag: 0.1.35
707+
dockerImageTag: 0.1.36
708708
documentationUrl: https://docs.airbyte.io/integrations/sources/shopify
709709
icon: shopify.svg
710710
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7430,7 +7430,7 @@
74307430
supportsNormalization: false
74317431
supportsDBT: false
74327432
supported_destination_sync_modes: []
7433-
- dockerImage: "airbyte/source-shopify:0.1.35"
7433+
- dockerImage: "airbyte/source-shopify:0.1.36"
74347434
spec:
74357435
documentationUrl: "https://docs.airbyte.io/integrations/sources/shopify"
74367436
connectionSpecification:

airbyte-integrations/connectors/source-shopify/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ COPY source_shopify ./source_shopify
2828
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
2929
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3030

31-
LABEL io.airbyte.version=0.1.35
31+
LABEL io.airbyte.version=0.1.36
3232
LABEL io.airbyte.name=airbyte/source-shopify

airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ tests:
2525
timeout_seconds: 3600
2626
# some streams hold data only for some time, therefore certain streams could be empty while sync.
2727
# 'abandoned_checkouts' stream holds data up to 1 month.
28-
empty_streams: ["abandoned_checkouts"]
28+
empty_streams: ["abandoned_checkouts", "balance_transactions"]
2929
incremental:
3030
- config_path: "secrets/config.json"
3131
configured_catalog_path: "integration_tests/configured_catalog.json"

airbyte-integrations/connectors/source-shopify/integration_tests/abnormal_state.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,8 @@
7777
"orders": {
7878
"updated_at": "2025-03-03T03:47:46-08:00"
7979
}
80+
},
81+
"balance_transactions": {
82+
"id": 9999999999999
8083
}
8184
}

airbyte-integrations/connectors/source-shopify/integration_tests/configured_catalog.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,18 @@
249249
"sync_mode": "incremental",
250250
"cursor_field": ["updated_at"],
251251
"destination_sync_mode": "append"
252+
},
253+
{
254+
"stream": {
255+
"name": "balance_transactions",
256+
"json_schema": {},
257+
"supported_sync_modes": ["incremental", "full_refresh"],
258+
"source_defined_cursor": true,
259+
"default_cursor_field": ["id"]
260+
},
261+
"sync_mode": "incremental",
262+
"cursor_field": ["id"],
263+
"destination_sync_mode": "append"
252264
}
253265
]
254266
}

airbyte-integrations/connectors/source-shopify/integration_tests/state.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,8 @@
7777
"orders": {
7878
"updated_at": "2022-03-03T03:47:46-08:00"
7979
}
80+
},
81+
"balance_transactions": {
82+
"id": 29427031703741
8083
}
8184
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"type": ["null", "object"],
3+
"properties": {
4+
"id": {
5+
"type": "integer"
6+
},
7+
"type": {
8+
"type": ["null", "string"]
9+
},
10+
"test": {
11+
"type": ["null", "boolean"]
12+
},
13+
"payout_id": {
14+
"type": ["null", "integer"]
15+
},
16+
"payout_status": {
17+
"type": ["null", "string"]
18+
},
19+
"payoucurrencyt_status": {
20+
"type": ["null", "string"]
21+
},
22+
"amount": {
23+
"type": ["null", "number"]
24+
},
25+
"fee": {
26+
"type": ["null", "number"]
27+
},
28+
"net": {
29+
"type": ["null", "number"]
30+
},
31+
"source_id": {
32+
"type": ["null", "integer"]
33+
},
34+
"source_type": {
35+
"type": ["null", "string"]
36+
},
37+
"source_order_transaction_id": {
38+
"type": ["null", "integer"]
39+
},
40+
"source_order_id": {
41+
"type": ["null", "integer"]
42+
},
43+
"processed_at": {
44+
"type": ["null", "string"],
45+
"format": "date-time"
46+
},
47+
"shop_url": {
48+
"type": ["null", "string"]
49+
}
50+
}
51+
52+
}

airbyte-integrations/connectors/source-shopify/source_shopify/source.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ def __init__(self, config: Dict):
3838
@property
3939
def url_base(self) -> str:
4040
return f"https://{self.config['shop']}.myshopify.com/admin/api/{self.api_version}/"
41+
42+
@property
43+
def default_filter_field_value(self) -> Union[int, str]:
44+
# certain streams are using `since_id` field as `filter_field`, which requires to use `int` type,
45+
# but many other use `str` values for this, we determine what to use based on `filter_field` value
46+
# by default, we use the user defined `Start Date` as initial value, or 0 for `id`-dependent streams.
47+
return 0 if self.filter_field == "since_id" else self.config["start_date"]
4148

4249
@staticmethod
4350
def next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]:
@@ -53,7 +60,7 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) ->
5360
params.update(**next_page_token)
5461
else:
5562
params["order"] = f"{self.order_field} asc"
56-
params[self.filter_field] = self.config["start_date"]
63+
params[self.filter_field] = self.default_filter_field_value
5764
return params
5865

5966
@limiter.balance_rate_limit()
@@ -92,16 +99,16 @@ def state_checkpoint_interval(self) -> int:
9299
cursor_field = "updated_at"
93100

94101
@property
95-
def default_comparison_value(self) -> Union[int, str]:
102+
def default_state_comparison_value(self) -> Union[int, str]:
96103
# certain streams are using `id` field as `cursor_field`, which requires to use `int` type,
97104
# but many other use `str` values for this, we determine what to use based on `cursor_field` value
98105
return 0 if self.cursor_field == "id" else ""
99106

100107
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
101108
return {
102109
self.cursor_field: max(
103-
latest_record.get(self.cursor_field, self.default_comparison_value),
104-
current_stream_state.get(self.cursor_field, self.default_comparison_value),
110+
latest_record.get(self.cursor_field, self.default_state_comparison_value),
111+
current_stream_state.get(self.cursor_field, self.default_state_comparison_value),
105112
)
106113
}
107114

@@ -307,25 +314,31 @@ class Collects(IncrementalShopifyStream):
307314
The Collect stream is the link between Products and Collections, if the Collection is created for Products,
308315
the `collect` record is created, it's reasonable to Full Refresh all collects. As for Incremental refresh -
309316
we would use the since_id specificaly for this stream.
310-
311317
"""
312318

313319
data_field = "collects"
314320
cursor_field = "id"
315321
order_field = "id"
316322
filter_field = "since_id"
317-
323+
318324
def path(self, **kwargs) -> str:
319325
return f"{self.data_field}.json"
320326

321-
def request_params(
322-
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
323-
) -> MutableMapping[str, Any]:
324-
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
325-
# If there is a next page token then we should only send pagination-related parameters.
326-
if not next_page_token and not stream_state:
327-
params[self.filter_field] = 0
328-
return params
327+
328+
class BalanceTransactions(IncrementalShopifyStream):
329+
330+
"""
331+
PaymentsTransactions stream does not support Incremental Refresh based on datetime fields, only `since_id` is supported:
332+
https://shopify.dev/api/admin-rest/2021-07/resources/transactions
333+
"""
334+
335+
data_field = "transactions"
336+
cursor_field = "id"
337+
order_field = "id"
338+
filter_field = "since_id"
339+
340+
def path(self, **kwargs) -> str:
341+
return f"shopify_payments/balance/{self.data_field}.json"
329342

330343

331344
class OrderRefunds(ShopifySubstream):
@@ -514,6 +527,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
514527
OrderRisks(config),
515528
TenderTransactions(config),
516529
Transactions(config),
530+
BalanceTransactions(config),
517531
Pages(config),
518532
PriceRules(config),
519533
DiscountCodes(config),

airbyte-integrations/connectors/source-shopify/source_shopify/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"read_locations": ["Locations"],
2121
"read_inventory": ["InventoryItems", "InventoryLevels"],
2222
"read_merchant_managed_fulfillment_orders": ["FulfillmentOrders"],
23+
"read_shopify_payments_payouts": ["BalanceTransactions"],
2324
}
2425

2526

docs/integrations/sources/shopify.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,45 @@ This source can sync data for the [Shopify API](https://help.shopify.com/en/api/
1313

1414
This Source Connector is based on a [Airbyte CDK](https://docs.airbyte.io/connector-development/cdk-python).
1515

16+
## Troubleshooting
17+
18+
Check out common troubleshooting issues for the BigQuery destination connector on our Discourse [here](https://discuss.airbyte.io/tags/c/connector/11/source-shopify).
19+
20+
### Output schema
21+
22+
This Source is capable of syncing the following core Streams:
23+
24+
* [Abandoned Checkouts](https://help.shopify.com/en/api/reference/orders/abandoned_checkouts)
25+
* [Collects](https://help.shopify.com/en/api/reference/products/collect)
26+
* [Custom Collections](https://help.shopify.com/en/api/reference/products/customcollection)
27+
* [Customers](https://help.shopify.com/en/api/reference/customers)
28+
* [Draft Orders](https://help.shopify.com/en/api/reference/orders/draftorder)
29+
* [Discount Codes](https://shopify.dev/docs/admin-api/rest/reference/discounts/discountcode)
30+
* [Metafields](https://help.shopify.com/en/api/reference/metafield)
31+
* [Orders](https://help.shopify.com/en/api/reference/orders)
32+
* [Orders Refunds](https://shopify.dev/api/admin/rest/reference/orders/refund)
33+
* [Orders Risks](https://shopify.dev/api/admin/rest/reference/orders/order-risk)
34+
* [Products](https://help.shopify.com/en/api/reference/products)
35+
* [Transactions](https://help.shopify.com/en/api/reference/orders/transaction)
36+
* [Balance Transactions](https://shopify.dev/api/admin-rest/2021-07/resources/transactions)
37+
* [Pages](https://help.shopify.com/en/api/reference/online-store/page)
38+
* [Price Rules](https://help.shopify.com/en/api/reference/discounts/pricerule)
39+
* [Locations](https://shopify.dev/api/admin-rest/2021-10/resources/location)
40+
* [InventoryItems](https://shopify.dev/api/admin-rest/2021-10/resources/inventoryItem)
41+
* [InventoryLevels](https://shopify.dev/api/admin-rest/2021-10/resources/inventorylevel)
42+
* [Fulfillment Orders](https://shopify.dev/api/admin-rest/2021-07/resources/fulfillmentorder)
43+
* [Fulfillments](https://shopify.dev/api/admin-rest/2021-07/resources/fulfillment)
44+
* [Shop](https://shopify.dev/api/admin-rest/2021-07/resources/shop)
45+
46+
#### NOTE:
47+
48+
For better experience with `Incremental Refresh` the following is recommended:
49+
50+
* `Order Refunds`, `Order Risks`, `Transactions` should be synced along with `Orders` stream.
51+
* `Discount Codes` should be synced along with `Price Rules` stream.
52+
53+
If child streams are synced alone from the parent stream - the full sync will take place, and the records are filtered out afterwards.
54+
1655
### Data type mapping
1756

1857
| Integration Type | Airbyte Type |
@@ -100,6 +139,7 @@ This is expected when the connector hits the 429 - Rate Limit Exceeded HTTP Erro
100139

101140
| Version | Date | Pull Request | Subject |
102141
| :--- | :--- | :--- | :--- |
142+
| 0.1.36 | 2022-03-22 | [9850](https://github.com/airbytehq/airbyte/pull/9850) | Added `BalanceTransactions` stream |
103143
| 0.1.35 | 2022-03-07 | [10915](https://github.com/airbytehq/airbyte/pull/10915) | Fix a bug which caused `full-refresh` syncs of child REST entities configured for `incremental` |
104144
| 0.1.34 | 2022-03-02 | [10794](https://github.com/airbytehq/airbyte/pull/10794) | Minor specification re-order, fixed links in documentation |
105145
| 0.1.33 | 2022-02-17 | [10419](https://github.com/airbytehq/airbyte/pull/10419) | Fixed wrong field type for tax_exemptions for `Abandoned_checkouts` stream |

0 commit comments

Comments
 (0)