Skip to content

🐛 Source Shopify: Fix FAILED scenario for BULK streams, fixed deleted events STATE collision #42973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ef41bf9
test
bazarnov Jul 11, 2024
edd3ee8
Merge remote-tracking branch 'origin/master'
bazarnov Jul 11, 2024
cb299d0
Merge remote-tracking branch 'origin/master'
bazarnov Jul 13, 2024
adcae54
Merge branch 'master' of https://github.com/airbytehq/airbyte
bazarnov Jul 16, 2024
5057187
Merge remote-tracking branch 'origin/master'
bazarnov Jul 16, 2024
7203d14
Merge remote-tracking branch 'origin/master'
bazarnov Jul 16, 2024
2a8143d
Merge remote-tracking branch 'origin/master'
bazarnov Jul 17, 2024
f71eb18
Merge remote-tracking branch 'origin/master'
bazarnov Jul 17, 2024
7c05236
Merge remote-tracking branch 'origin/master'
bazarnov Jul 26, 2024
41e1e73
Merge remote-tracking branch 'origin/master'
bazarnov Jul 29, 2024
41d30ba
Merge branch 'master' of https://github.com/airbytehq/airbyte
bazarnov Jul 29, 2024
844dbb8
Merge remote-tracking branch 'origin/master'
bazarnov Jul 30, 2024
6690dd5
Merge remote-tracking branch 'origin/master'
bazarnov Jul 31, 2024
68ed8cd
Merge remote-tracking branch 'origin/master'
bazarnov Aug 2, 2024
59992d7
fixed bulk failed partial jobs for checkpoint-supported streams
bazarnov Aug 2, 2024
1df35c0
fixed cursor overlap for rest streams with deleted events
bazarnov Aug 2, 2024
b955a80
fixed regressions
bazarnov Aug 2, 2024
9250b29
removed useless checks from CAT config
bazarnov Aug 2, 2024
acdf27f
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Aug 2, 2024
a9929d2
formatted
bazarnov Aug 2, 2024
fcbe2dd
returned missing files
bazarnov Aug 2, 2024
a0f1e4c
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Aug 2, 2024
69c89f0
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Aug 2, 2024
2843594
updated changelog
bazarnov Aug 2, 2024
c4b5335
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Aug 4, 2024
d008e54
updated after the review
bazarnov Aug 5, 2024
46ca3ad
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Aug 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ acceptance_tests:
spec:
tests:
- spec_path: "source_shopify/spec.json"
backward_compatibility_tests_config:
# This is the intentional change.
# Added new fields: `job_checkpoint_interval`, `job_product_variants_include_pres_prices`
# to provide the ability to override this value by the User.
disable_for_version: 2.4.14
connection:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
{"stream": "metafield_orders", "data": {"id": 22347287855293, "namespace": "my_fields", "value": "trtrtr", "key": "purchase_order", "description": null, "created_at": "2023-04-13T12:09:08+00:00", "updated_at": "2023-04-13T12:09:08+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22347287855293", "owner_id": 4147980107965, "owner_resource": "order", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953849754}
{"stream": "metafield_pages", "data": {"id": 22365711499453, "namespace": "custom", "key": "test_page_metafield", "value": "Test Page Metafield", "description": null, "owner_id": 93795909821, "created_at": "2023-04-14T03:21:49-07:00", "updated_at": "2023-04-14T03:21:49-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365711499453", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953851755}
{"stream": "metafield_pages", "data": {"id": 22534014828733, "namespace": "new_metafield", "key": "new_metafield", "value": "updated_mon_24.04.2023", "description": null, "owner_id": 83074252989, "created_at": "2023-04-24T11:08:41-07:00", "updated_at": "2023-04-24T11:08:41-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22534014828733", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953852192}
{"stream": "metafield_product_images", "data": {"id": 22365851517117, "namespace": "my_fields", "value": "natural coton", "key": "liner_material", "description": null, "created_at": "2023-04-14T11:59:27+00:00", "updated_at": "2023-04-14T11:59:27+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365851517117", "owner_id": 29301295481021, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953865505}
{"stream": "metafield_product_images", "data": {"id": 22533588451517, "namespace": "new_metafield", "value": "updated_mon_24.04.2023", "key": "new_metafield", "description": null, "created_at": "2023-04-24T17:32:19+00:00", "updated_at": "2023-04-24T17:32:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22533588451517", "owner_id": 29301297316029, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953865509}
{"stream":"metafield_product_images","data":{"id":22365851517117,"namespace":"my_fields","value":"natural coton","key":"liner_material","description":null,"created_at":"2023-04-14T11:59:27+00:00","updated_at":"2024-07-31T14:19:45+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22365851517117","owner_id":21562154025149,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
{"stream":"metafield_product_images","data":{"id":22533588451517,"namespace":"new_metafield","value":"updated_mon_24.04.2023","key":"new_metafield","description":null,"created_at":"2023-04-24T17:32:19+00:00","updated_at":"2024-07-31T14:19:58+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22533588451517","owner_id":21562155892925,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
{"stream": "metafield_products", "data": {"id": 22365729718461, "namespace": "custom", "value": "Test Product Metafield", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T10:31:19+00:00", "updated_at": "2023-04-14T10:31:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365729718461", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953872773}
{"stream": "metafield_products", "data": {"id": 22365729816765, "namespace": "custom", "value": "gid://shopify/Product/6796229574845", "key": "test_product_metafield", "description": null, "created_at": "2023-04-14T10:31:29+00:00", "updated_at": "2023-04-14T10:31:29+00:00", "type": "product_reference", "admin_graphql_api_id": "gid://shopify/Metafield/22365729816765", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953872775}
{"stream": "metafield_products", "data": {"id": 22365772251325, "namespace": "custom", "value": "Test", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T11:04:46+00:00", "updated_at": "2023-04-14T11:04:46+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365772251325", "owner_id": 6796229574845, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953872776}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
{"stream": "metafield_orders", "data": {"id": 22347287855293, "namespace": "my_fields", "value": "trtrtr", "key": "purchase_order", "description": null, "created_at": "2023-04-13T12:09:08+00:00", "updated_at": "2023-04-13T12:09:08+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22347287855293", "owner_id": 4147980107965, "owner_resource": "order", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953626962}
{"stream": "metafield_pages", "data": {"id": 22365711499453, "namespace": "custom", "key": "test_page_metafield", "value": "Test Page Metafield", "description": null, "owner_id": 93795909821, "created_at": "2023-04-14T03:21:49-07:00", "updated_at": "2023-04-14T03:21:49-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365711499453", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953628925}
{"stream": "metafield_pages", "data": {"id": 22534014828733, "namespace": "new_metafield", "key": "new_metafield", "value": "updated_mon_24.04.2023", "description": null, "owner_id": 83074252989, "created_at": "2023-04-24T11:08:41-07:00", "updated_at": "2023-04-24T11:08:41-07:00", "owner_resource": "page", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22534014828733", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953629370}
{"stream": "metafield_product_images", "data": {"id": 22365851517117, "namespace": "my_fields", "value": "natural coton", "key": "liner_material", "description": null, "created_at": "2023-04-14T11:59:27+00:00", "updated_at": "2023-04-14T11:59:27+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365851517117", "owner_id": 29301295481021, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953637175}
{"stream": "metafield_product_images", "data": {"id": 22533588451517, "namespace": "new_metafield", "value": "updated_mon_24.04.2023", "key": "new_metafield", "description": null, "created_at": "2023-04-24T17:32:19+00:00", "updated_at": "2023-04-24T17:32:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22533588451517", "owner_id": 29301297316029, "owner_resource": "product_image", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953637175}
{"stream":"metafield_product_images","data":{"id":22365851517117,"namespace":"my_fields","value":"natural coton","key":"liner_material","description":null,"created_at":"2023-04-14T11:59:27+00:00","updated_at":"2024-07-31T14:19:45+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22365851517117","owner_id":21562154025149,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
{"stream":"metafield_product_images","data":{"id":22533588451517,"namespace":"new_metafield","value":"updated_mon_24.04.2023","key":"new_metafield","description":null,"created_at":"2023-04-24T17:32:19+00:00","updated_at":"2024-07-31T14:19:58+00:00","type":"single_line_text_field","admin_graphql_api_id":"gid://shopify/Metafield/22533588451517","owner_id":21562155892925,"owner_resource":"media_image","shop_url":"airbyte-integration-test"},"emitted_at":1722611835917}
{"stream": "metafield_products", "data": {"id": 22365729718461, "namespace": "custom", "value": "Test Product Metafield", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T10:31:19+00:00", "updated_at": "2023-04-14T10:31:19+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365729718461", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953644458}
{"stream": "metafield_products", "data": {"id": 22365729816765, "namespace": "custom", "value": "gid://shopify/Product/6796229574845", "key": "test_product_metafield", "description": null, "created_at": "2023-04-14T10:31:29+00:00", "updated_at": "2023-04-14T10:31:29+00:00", "type": "product_reference", "admin_graphql_api_id": "gid://shopify/Metafield/22365729816765", "owner_id": 6796226560189, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953644459}
{"stream": "metafield_products", "data": {"id": 22365772251325, "namespace": "custom", "value": "Test", "key": "product_metafield_test_2", "description": null, "created_at": "2023-04-14T11:04:46+00:00", "updated_at": "2023-04-14T11:04:46+00:00", "type": "single_line_text_field", "admin_graphql_api_id": "gid://shopify/Metafield/22365772251325", "owner_id": 6796229574845, "owner_resource": "product", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953644459}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.4.16
dockerImageTag: 2.4.17
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.16"
version = "2.4.17"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def _job_completed(self) -> bool:
def _job_canceled(self) -> bool:
return self._job_state == ShopifyBulkJobStatus.CANCELED.value

def _job_failed(self) -> bool:
return self._job_state == ShopifyBulkJobStatus.FAILED.value

def _job_cancel(self) -> None:
_, canceled_response = self.http_client.send_request(
http_method="POST",
Expand Down Expand Up @@ -255,7 +258,9 @@ def _log_state(self, message: Optional[str] = None) -> None:
def _job_get_result(self, response: Optional[requests.Response] = None) -> Optional[str]:
parsed_response = response.json().get("data", {}).get("node", {}) if response else None
# get `complete` or `partial` result from collected Bulk Job results
job_result_url = parsed_response.get("url", parsed_response.get("partialDataUrl")) if parsed_response else None
full_result_url = parsed_response.get("url") if parsed_response else None
partial_result_url = parsed_response.get("partialDataUrl") if parsed_response else None
job_result_url = full_result_url if full_result_url else partial_result_url
if job_result_url:
# save to local file using chunks to avoid OOM
filename = self._tools.filename_from_url(job_result_url)
Expand Down Expand Up @@ -326,13 +331,13 @@ def _on_completed_job(self, response: Optional[requests.Response] = None) -> Non
self._job_result_filename = self._job_get_result(response)

def _on_failed_job(self, response: requests.Response) -> AirbyteTracedException:
if not self._job_any_lines_collected:
if not self._supports_checkpointing:
raise ShopifyBulkExceptions.BulkJobFailed(
f"The BULK Job: `{self._job_id}` exited with {self._job_state}, details: {response.text}",
)
else:
# when the Bulk Job fails, usually there is a `partialDataUrl` available,
# we leverage the checkpointing in this case
# we leverage the checkpointing in this case.
self._job_get_checkpointed_result(response)

def _on_timeout_job(self, **kwargs) -> AirbyteTracedException:
Expand Down Expand Up @@ -434,6 +439,8 @@ def _job_check_state(self) -> None:
while not self._job_completed():
if self._job_canceled():
break
elif self._job_failed():
break
else:
self._job_track_running()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class MetafieldType(Enum):
ORDERS = "orders"
DRAFT_ORDERS = "draftOrders"
PRODUCTS = "products"
PRODUCT_IMAGES = ["products", "images"]
PRODUCT_IMAGES = "products"
PRODUCT_VARIANTS = "productVariants"
COLLECTIONS = "collections"
LOCATIONS = "locations"
Expand Down Expand Up @@ -491,25 +491,30 @@ class MetafieldProduct(Metafield):
class MetafieldProductImage(Metafield):
"""
{
products(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
products(query: "updated_at:>='2023-01-08T00:00:00+00:00' AND updated_at:<='2024-08-02T15:12:41.689153+00:00'", sortKey: UPDATED_AT) {
edges {
node {
__typename
id
images{
edges{
node{
media {
edges {
node {
__typename
id
metafields {
edges {
node {
id
namespace
value
key
description
createdAt
updatedAt
type
... on MediaImage {
metafields {
edges {
node {
__typename
id
namespace
value
key
description
createdAt
updatedAt
type
}
}
}
}
Expand All @@ -522,6 +527,28 @@ class MetafieldProductImage(Metafield):
}
"""

@property
def query_nodes(self) -> List[Field]:
"""
This is the overide for the default `query_nodes` method,
because the usual way of retrieving the metafields for product images` was suddently deprecated,
for `2024-10`, but the changes are reflected in the `2024-04` as well, starting: `2024-08-01T00:06:44`

More info here:
https://shopify.dev/docs/api/release-notes/2024-04#productimage-value-removed
"""
# define metafield node
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
media_fields: List[Field] = [
"__typename",
"id",
InlineFragment(type="MediaImage", fields=[metafield_node]),
]
# define media node
media_node = self.get_edge_node("media", media_fields)
fields: List[Field] = ["__typename", "id", media_node]
return fields

type = MetafieldType.PRODUCT_IMAGES


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def produce_deleted_records_from_events(self, delete_events: Iterable[Mapping[st
yield {
"id": event["subject_id"],
self.cursor_field: event["created_at"],
"updated_at": event["created_at"],
"deleted_message": event["message"],
"deleted_description": event["description"],
"shop_url": event["shop_url"],
Expand Down Expand Up @@ -571,6 +570,10 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg


class IncrementalShopifyStreamWithDeletedEvents(IncrementalShopifyStream):
def __init__(self, config: Dict) -> None:
self._stream_state: MutableMapping[str, Any] = {}
super().__init__(config)

@property
@abstractmethod
def deleted_events_api_name(self) -> str:
Expand Down Expand Up @@ -607,13 +610,13 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
"""
We extend the stream state with `deleted` property to store the `destroyed` records STATE separetely from the Stream State.
"""
state = super().get_updated_state(current_stream_state, latest_record)
self._stream_state = super().get_updated_state(self._stream_state, latest_record)
# add `deleted` property to each stream supports `deleted events`,
# to provide the `Incremental` sync mode, for the `Incremental Delete` records.
last_deleted_record_value = latest_record.get(self.deleted_cursor_field) or self.default_deleted_state_comparison_value
current_deleted_state_value = current_stream_state.get(self.deleted_cursor_field) or self.default_deleted_state_comparison_value
state["deleted"] = {self.deleted_cursor_field: max(last_deleted_record_value, current_deleted_state_value)}
return state
self._stream_state["deleted"] = {self.deleted_cursor_field: max(last_deleted_record_value, current_deleted_state_value)}
return self._stream_state

def read_records(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,35 @@ def bulk_job_failed_response():
},
}


@pytest.fixture
def bulk_job_failed_with_partial_url_response():
return {
"data": {
"node": {
"id": "gid://shopify/BulkOperation/123",
"status": "FAILED",
"errorCode": "INTERNAL_SERVER_ERROR",
"objectCount": "432",
"fileSize": None,
"url": None,
"partialDataUrl": 'https://some_url?response-content-disposition=attachment;+filename="bulk-123456789.jsonl";+filename*=UTF-8'
"bulk-123456789.jsonl&response-content-type=application/jsonl"
}
},
"extensions": {
"cost": {
"requestedQueryCost": 1,
"actualQueryCost": 1,
"throttleStatus": {
"maximumAvailable": 20000.0,
"currentlyAvailable": 19999,
"restoreRate": 1000.0
}
}
}
}


@pytest.fixture
def bulk_job_timeout_response():
Expand Down
Loading
Loading