Skip to content

Commit e6a98e1

Browse files
authored
šŸ› Source Shopify: Fix FAILED scenario for BULK streams, fixed deleted events STATE collision (#42973)
1 parent 91ac3bb commit e6a98e1

File tree

13 files changed

+192
-53
lines changed

13 files changed

+192
-53
lines changed

ā€Žairbyte-integrations/connectors/source-shopify/acceptance-test-config.yml

-5
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@ acceptance_tests:
44
spec:
55
tests:
66
- spec_path: "source_shopify/spec.json"
7-
backward_compatibility_tests_config:
8-
# This is the intentional change.
9-
# Added new fields: `job_checkpoint_interval`, `job_product_variants_include_pres_prices`
10-
# to provide the ability to override this value by the User.
11-
disable_for_version: 2.4.14
127
connection:
138
tests:
149
- config_path: "secrets/config.json"

ā€Žairbyte-integrations/connectors/source-shopify/integration_tests/expected_records.jsonl

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
{"stream": "metafield_orders", "data": {"id": 22347287855293, "namespace": "my_fields", "value": "trtrtr", "key": "purchase_order", "description": null, "created_at": "2023-04-13T12:09:08+00:00", "updated_at": "2023-04-13T12:09:08+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22347287855293", "owner_id": 4147980107965, "owner_resource": "order", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953849754}
4949
{"stream": "metafield_pages", "data": {"id": 22365711499453, "namespace": "custom", "key": "test_page_metafield", "value": "Test Page Metafield", "description": null, "owner_id": 93795909821, "created_at": "2023-04-14T03:21:49-07:00", "updated_at": "2023-04-14T03:21:49-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365711499453", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953851755}
5050
{"stream": "metafield_pages", "data": {"id": 22534014828733, "namespace": "new_metafield", "key": "new_metafield", "value": "updated_mon_24.04.2023", "description": null, "owner_id": 83074252989, "created_at": "2023-04-24T11:08:41-07:00", "updated_at": "2023-04-24T11:08:41-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22534014828733", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953852192}
51-
{"stream": "metafield_product_images", "data": {"id": 22365851517117, "namespace": "my_fields", "value": "natural coton", "key": "liner_material", "description": null, "created_at": "2023-04-14T11:59:27+00:00", "updated_at": "2023-04-14T11:59:27+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365851517117", "owner_id": 29301295481021, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953865505}
52-
{"stream": "metafield_product_images", "data": {"id": 22533588451517, "namespace": "new_metafield", "value": "updated_mon_24.04.2023", "key": "new_metafield", "description": null, "created_at": "2023-04-24T17:32:19+00:00", "updated_at": "2023-04-24T17:32:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22533588451517", "owner_id": 29301297316029, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953865509}
51+
{"stream":"metafield_product_images","data":{"id":22365851517117,"namespace":"my_fields","value":"natural coton","key":"liner_material","description":null,"created_at":"2023-04-14T11:59:27+00:00","updated_at":"2024-07-31T14:19:45+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22365851517117","owner_id":21562154025149,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
52+
{"stream":"metafield_product_images","data":{"id":22533588451517,"namespace":"new_metafield","value":"updated_mon_24.04.2023","key":"new_metafield","description":null,"created_at":"2023-04-24T17:32:19+00:00","updated_at":"2024-07-31T14:19:58+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22533588451517","owner_id":21562155892925,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
5353
{"stream": "metafield_products", "data": {"id": 22365729718461, "namespace": "custom", "value": "Test Product Metafield", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T10:31:19+00:00", "updated_at": "2023-04-14T10:31:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365729718461", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953872773}
5454
{"stream": "metafield_products", "data": {"id": 22365729816765, "namespace": "custom", "value": "gid://shopify/Product/6796229574845", "key": "test_product_metafield", "description": null, "created_at": "2023-04-14T10:31:29+00:00", "updated_at": "2023-04-14T10:31:29+00:00", "type": "product_reference", "admin_graphql_api_id": "gid://shopify/Metafield/22365729816765", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953872775}
5555
{"stream": "metafield_products", "data": {"id": 22365772251325, "namespace": "custom", "value": "Test", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T11:04:46+00:00", "updated_at": "2023-04-14T11:04:46+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365772251325", "owner_id": 6796229574845, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953872776}

ā€Žairbyte-integrations/connectors/source-shopify/integration_tests/expected_records_transactions_with_user_id.jsonl

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
{"stream": "metafield_orders", "data": {"id": 22347287855293, "namespace": "my_fields", "value": "trtrtr", "key": "purchase_order", "description": null, "created_at": "2023-04-13T12:09:08+00:00", "updated_at": "2023-04-13T12:09:08+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22347287855293", "owner_id": 4147980107965, "owner_resource": "order", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953626962}
4949
{"stream": "metafield_pages", "data": {"id": 22365711499453, "namespace": "custom", "key": "test_page_metafield", "value": "Test Page Metafield", "description": null, "owner_id": 93795909821, "created_at": "2023-04-14T03:21:49-07:00", "updated_at": "2023-04-14T03:21:49-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365711499453", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953628925}
5050
{"stream": "metafield_pages", "data": {"id": 22534014828733, "namespace": "new_metafield", "key": "new_metafield", "value": "updated_mon_24.04.2023", "description": null, "owner_id": 83074252989, "created_at": "2023-04-24T11:08:41-07:00", "updated_at": "2023-04-24T11:08:41-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22534014828733", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953629370}
51-
{"stream": "metafield_product_images", "data": {"id": 22365851517117, "namespace": "my_fields", "value": "natural coton", "key": "liner_material", "description": null, "created_at": "2023-04-14T11:59:27+00:00", "updated_at": "2023-04-14T11:59:27+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365851517117", "owner_id": 29301295481021, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953637175}
52-
{"stream": "metafield_product_images", "data": {"id": 22533588451517, "namespace": "new_metafield", "value": "updated_mon_24.04.2023", "key": "new_metafield", "description": null, "created_at": "2023-04-24T17:32:19+00:00", "updated_at": "2023-04-24T17:32:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22533588451517", "owner_id": 29301297316029, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953637175}
51+
{"stream":"metafield_product_images","data":{"id":22365851517117,"namespace":"my_fields","value":"natural coton","key":"liner_material","description":null,"created_at":"2023-04-14T11:59:27+00:00","updated_at":"2024-07-31T14:19:45+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22365851517117","owner_id":21562154025149,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
52+
{"stream":"metafield_product_images","data":{"id":22533588451517,"namespace":"new_metafield","value":"updated_mon_24.04.2023","key":"new_metafield","description":null,"created_at":"2023-04-24T17:32:19+00:00","updated_at":"2024-07-31T14:19:58+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22533588451517","owner_id":21562155892925,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
5353
{"stream": "metafield_products", "data": {"id": 22365729718461, "namespace": "custom", "value": "Test Product Metafield", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T10:31:19+00:00", "updated_at": "2023-04-14T10:31:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365729718461", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953644458}
5454
{"stream": "metafield_products", "data": {"id": 22365729816765, "namespace": "custom", "value": "gid://shopify/Product/6796229574845", "key": "test_product_metafield", "description": null, "created_at": "2023-04-14T10:31:29+00:00", "updated_at": "2023-04-14T10:31:29+00:00", "type": "product_reference", "admin_graphql_api_id": "gid://shopify/Metafield/22365729816765", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953644459}
5555
{"stream": "metafield_products", "data": {"id": 22365772251325, "namespace": "custom", "value": "Test", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T11:04:46+00:00", "updated_at": "2023-04-14T11:04:46+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365772251325", "owner_id": 6796229574845, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953644459}

ā€Žairbyte-integrations/connectors/source-shopify/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: api
1212
connectorType: source
1313
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
14-
dockerImageTag: 2.4.16
14+
dockerImageTag: 2.4.17
1515
dockerRepository: airbyte/source-shopify
1616
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
1717
githubIssueLabel: source-shopify

ā€Žairbyte-integrations/connectors/source-shopify/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.4.16"
6+
version = "2.4.17"
77
name = "source-shopify"
88
description = "Source CDK implementation for Shopify."
99
authors = [ "Airbyte <[email protected]>",]

ā€Žairbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,9 @@ def _job_completed(self) -> bool:
216216
def _job_canceled(self) -> bool:
217217
return self._job_state == ShopifyBulkJobStatus.CANCELED.value
218218

219+
def _job_failed(self) -> bool:
220+
return self._job_state == ShopifyBulkJobStatus.FAILED.value
221+
219222
def _job_cancel(self) -> None:
220223
_, canceled_response = self.http_client.send_request(
221224
http_method="POST",
@@ -255,7 +258,9 @@ def _log_state(self, message: Optional[str] = None) -> None:
255258
def _job_get_result(self, response: Optional[requests.Response] = None) -> Optional[str]:
256259
parsed_response = response.json().get("data", {}).get("node", {}) if response else None
257260
# get `complete` or `partial` result from collected Bulk Job results
258-
job_result_url = parsed_response.get("url", parsed_response.get("partialDataUrl")) if parsed_response else None
261+
full_result_url = parsed_response.get("url") if parsed_response else None
262+
partial_result_url = parsed_response.get("partialDataUrl") if parsed_response else None
263+
job_result_url = full_result_url if full_result_url else partial_result_url
259264
if job_result_url:
260265
# save to local file using chunks to avoid OOM
261266
filename = self._tools.filename_from_url(job_result_url)
@@ -326,13 +331,13 @@ def _on_completed_job(self, response: Optional[requests.Response] = None) -> Non
326331
self._job_result_filename = self._job_get_result(response)
327332

328333
def _on_failed_job(self, response: requests.Response) -> AirbyteTracedException:
329-
if not self._job_any_lines_collected:
334+
if not self._supports_checkpointing:
330335
raise ShopifyBulkExceptions.BulkJobFailed(
331336
f"The BULK Job: `{self._job_id}` exited with {self._job_state}, details: {response.text}",
332337
)
333338
else:
334339
# when the Bulk Job fails, usually there is a `partialDataUrl` available,
335-
# we leverage the checkpointing in this case
340+
# we leverage the checkpointing in this case.
336341
self._job_get_checkpointed_result(response)
337342

338343
def _on_timeout_job(self, **kwargs) -> AirbyteTracedException:
@@ -434,6 +439,8 @@ def _job_check_state(self) -> None:
434439
while not self._job_completed():
435440
if self._job_canceled():
436441
break
442+
elif self._job_failed():
443+
break
437444
else:
438445
self._job_track_running()
439446

ā€Žairbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py

+43-16
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ class MetafieldType(Enum):
203203
ORDERS = "orders"
204204
DRAFT_ORDERS = "draftOrders"
205205
PRODUCTS = "products"
206-
PRODUCT_IMAGES = ["products", "images"]
206+
PRODUCT_IMAGES = "products"
207207
PRODUCT_VARIANTS = "productVariants"
208208
COLLECTIONS = "collections"
209209
LOCATIONS = "locations"
@@ -491,25 +491,30 @@ class MetafieldProduct(Metafield):
491491
class MetafieldProductImage(Metafield):
492492
"""
493493
{
494-
products(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
494+
products(query: "updated_at:>='2023-01-08T00:00:00+00:00' AND updated_at:<='2024-08-02T15:12:41.689153+00:00'", sortKey: UPDATED_AT) {
495495
edges {
496496
node {
497+
__typename
497498
id
498-
images{
499-
edges{
500-
node{
499+
media {
500+
edges {
501+
node {
502+
__typename
501503
id
502-
metafields {
503-
edges {
504-
node {
505-
id
506-
namespace
507-
value
508-
key
509-
description
510-
createdAt
511-
updatedAt
512-
type
504+
... on MediaImage {
505+
metafields {
506+
edges {
507+
node {
508+
__typename
509+
id
510+
namespace
511+
value
512+
key
513+
description
514+
createdAt
515+
updatedAt
516+
type
517+
}
513518
}
514519
}
515520
}
@@ -522,6 +527,28 @@ class MetafieldProductImage(Metafield):
522527
}
523528
"""
524529

530+
@property
531+
def query_nodes(self) -> List[Field]:
532+
"""
533+
This is the overide for the default `query_nodes` method,
534+
because the usual way of retrieving the metafields for product images` was suddently deprecated,
535+
for `2024-10`, but the changes are reflected in the `2024-04` as well, starting: `2024-08-01T00:06:44`
536+
537+
More info here:
538+
https://shopify.dev/docs/api/release-notes/2024-04#productimage-value-removed
539+
"""
540+
# define metafield node
541+
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
542+
media_fields: List[Field] = [
543+
"__typename",
544+
"id",
545+
InlineFragment(type="MediaImage", fields=[metafield_node]),
546+
]
547+
# define media node
548+
media_node = self.get_edge_node("media", media_fields)
549+
fields: List[Field] = ["__typename", "id", media_node]
550+
return fields
551+
525552
type = MetafieldType.PRODUCT_IMAGES
526553

527554

ā€Žairbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ def produce_deleted_records_from_events(self, delete_events: Iterable[Mapping[st
143143
yield {
144144
"id": event["subject_id"],
145145
self.cursor_field: event["created_at"],
146-
"updated_at": event["created_at"],
147146
"deleted_message": event["message"],
148147
"deleted_description": event["description"],
149148
"shop_url": event["shop_url"],
@@ -571,6 +570,10 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg
571570

572571

573572
class IncrementalShopifyStreamWithDeletedEvents(IncrementalShopifyStream):
573+
def __init__(self, config: Dict) -> None:
574+
self._stream_state: MutableMapping[str, Any] = {}
575+
super().__init__(config)
576+
574577
@property
575578
@abstractmethod
576579
def deleted_events_api_name(self) -> str:
@@ -607,13 +610,13 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
607610
"""
608611
We extend the stream state with `deleted` property to store the `destroyed` records STATE separetely from the Stream State.
609612
"""
610-
state = super().get_updated_state(current_stream_state, latest_record)
613+
self._stream_state = super().get_updated_state(self._stream_state, latest_record)
611614
# add `deleted` property to each stream supports `deleted events`,
612615
# to provide the `Incremental` sync mode, for the `Incremental Delete` records.
613616
last_deleted_record_value = latest_record.get(self.deleted_cursor_field) or self.default_deleted_state_comparison_value
614617
current_deleted_state_value = current_stream_state.get(self.deleted_cursor_field) or self.default_deleted_state_comparison_value
615-
state["deleted"] = {self.deleted_cursor_field: max(last_deleted_record_value, current_deleted_state_value)}
616-
return state
618+
self._stream_state["deleted"] = {self.deleted_cursor_field: max(last_deleted_record_value, current_deleted_state_value)}
619+
return self._stream_state
617620

618621
def read_records(
619622
self,

ā€Žairbyte-integrations/connectors/source-shopify/unit_tests/conftest.py

+29
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,35 @@ def bulk_job_failed_response():
358358
},
359359
}
360360

361+
362+
@pytest.fixture
363+
def bulk_job_failed_with_partial_url_response():
364+
return {
365+
"data": {
366+
"node": {
367+
"id": "gid://shopify/BulkOperation/123",
368+
"status": "FAILED",
369+
"errorCode": "INTERNAL_SERVER_ERROR",
370+
"objectCount": "432",
371+
"fileSize": None,
372+
"url": None,
373+
"partialDataUrl": 'https://some_url?response-content-disposition=attachment;+filename="bulk-123456789.jsonl";+filename*=UTF-8'
374+
"bulk-123456789.jsonl&response-content-type=application/jsonl"
375+
}
376+
},
377+
"extensions": {
378+
"cost": {
379+
"requestedQueryCost": 1,
380+
"actualQueryCost": 1,
381+
"throttleStatus": {
382+
"maximumAvailable": 20000.0,
383+
"currentlyAvailable": 19999,
384+
"restoreRate": 1000.0
385+
}
386+
}
387+
}
388+
}
389+
361390

362391
@pytest.fixture
363392
def bulk_job_timeout_response():

0 commit comments

Comments
Ā (0)