Skip to content

Commit 9c878c6

Browse files
marcosmarxmoctavia-squidington-iii
authored andcommitted
Source Cart.com: implement Central API Router access method and improve backoff policy (airbytehq#16612)
* add central_api_route param in spec.json * add central api router and better backoff policy * move logic to auth class * finish central api router * update auth methods * fix tests * readded config tests using access token * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <[email protected]>
1 parent f6b9e1e commit 9c878c6

File tree

10 files changed

+341
-47
lines changed

10 files changed

+341
-47
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@
136136
- name: Cart.com
137137
sourceDefinitionId: bb1a6d31-6879-4819-a2bd-3eed299ea8e2
138138
dockerRepository: airbyte/source-cart
139-
dockerImageTag: 0.1.6
139+
dockerImageTag: 0.2.0
140140
documentationUrl: https://docs.airbyte.io/integrations/sources/cart
141141
icon: cart.svg
142142
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,30 +1473,80 @@
14731473
supportsNormalization: false
14741474
supportsDBT: false
14751475
supported_destination_sync_modes: []
1476-
- dockerImage: "airbyte/source-cart:0.1.6"
1476+
- dockerImage: "airbyte/source-cart:0.2.0"
14771477
spec:
14781478
documentationUrl: "https://docs.airbyte.io/integrations/sources/cart"
14791479
connectionSpecification:
14801480
$schema: "http://json-schema.org/draft-07/schema#"
14811481
title: "Cart.com Spec"
14821482
type: "object"
14831483
required:
1484-
- "access_token"
14851484
- "start_date"
1486-
- "store_name"
14871485
additionalProperties: true
14881486
properties:
1489-
access_token:
1490-
type: "string"
1491-
title: "Access Token"
1492-
airbyte_secret: true
1493-
description: "Access Token for making authenticated requests."
1494-
store_name:
1495-
type: "string"
1496-
title: "Store Name"
1497-
description: "The name of Cart.com Online Store. All API URLs start with\
1498-
\ https://[mystorename.com]/api/v1/, where [mystorename.com] is the domain\
1499-
\ name of your store."
1487+
credentials:
1488+
title: "Authorization Method"
1489+
description: ""
1490+
type: "object"
1491+
oneOf:
1492+
- title: "Central API Router"
1493+
type: "object"
1494+
order: 0
1495+
required:
1496+
- "auth_type"
1497+
- "user_name"
1498+
- "user_secret"
1499+
- "site_id"
1500+
properties:
1501+
auth_type:
1502+
type: "string"
1503+
const: "CENTRAL_API_ROUTER"
1504+
order: 0
1505+
user_name:
1506+
type: "string"
1507+
title: "User Name"
1508+
description: "Enter your application's User Name"
1509+
airbyte_secret: true
1510+
order: 1
1511+
user_secret:
1512+
type: "string"
1513+
title: "User Secret"
1514+
description: "Enter your application's User Secret"
1515+
airbyte_secret: true
1516+
order: 2
1517+
site_id:
1518+
type: "string"
1519+
title: "Site ID"
1520+
description: "You can determine a site provisioning site Id by hitting\
1521+
\ https://site.com/store/sitemonitor.aspx and reading the response\
1522+
\ param PSID"
1523+
airbyte_secret: true
1524+
order: 3
1525+
- title: "Single Store Access Token"
1526+
type: "object"
1527+
order: 1
1528+
required:
1529+
- "auth_type"
1530+
- "access_token"
1531+
- "store_name"
1532+
properties:
1533+
auth_type:
1534+
type: "string"
1535+
const: "SINGLE_STORE_ACCESS_TOKEN"
1536+
order: 0
1537+
access_token:
1538+
type: "string"
1539+
title: "Access Token"
1540+
airbyte_secret: true
1541+
order: 1
1542+
description: "Access Token for making authenticated requests."
1543+
store_name:
1544+
type: "string"
1545+
title: "Store Name"
1546+
order: 2
1547+
description: "The name of Cart.com Online Store. All API URLs start\
1548+
\ with https://[mystorename.com]/api/v1/, where [mystorename.com]\
1549+
\ is the domain name of your store."
15001550
start_date:
15011551
title: "Start Date"
15021552
type: "string"

airbyte-integrations/connectors/source-cart/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ COPY source_cart ./source_cart
2121
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
2222
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
2323

24-
LABEL io.airbyte.version=0.1.6
24+
LABEL io.airbyte.version=0.2.0
2525
LABEL io.airbyte.name=airbyte/source-cart

airbyte-integrations/connectors/source-cart/acceptance-test-config.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ connector_image: airbyte/source-cart:dev
44
tests:
55
spec:
66
- spec_path: "source_cart/spec.json"
7+
backward_compatibility_tests_config:
8+
disable_for_version: "0.1.6"
79
connection:
810
- config_path: "secrets/config.json"
911
status: "succeed"
@@ -12,16 +14,26 @@ tests:
1214
timeout_seconds: 180
1315
discovery:
1416
- config_path: "secrets/config.json"
17+
backward_compatibility_tests_config:
18+
disable_for_version: "0.1.6"
1519
basic_read:
1620
- config_path: "secrets/config.json"
1721
configured_catalog_path: "integration_tests/configured_catalog.json"
1822
timeout_seconds: 1800
1923
incremental:
24+
- config_path: "secrets/config_central_api_router.json"
25+
configured_catalog_path: "integration_tests/configured_catalog_wo_order_statuses.json"
26+
future_state_path: "integration_tests/abnormal_state.json"
27+
timeout_seconds: 1800
2028
- config_path: "secrets/config.json"
2129
configured_catalog_path: "integration_tests/configured_catalog.json"
2230
future_state_path: "integration_tests/abnormal_state.json"
2331
timeout_seconds: 1800
2432
full_refresh:
33+
- config_path: "secrets/config_central_api_router.json"
34+
configured_catalog_path: "integration_tests/configured_catalog_wo_order_statuses.json"
35+
timeout_seconds: 1800
2536
- config_path: "secrets/config.json"
2637
configured_catalog_path: "integration_tests/configured_catalog.json"
2738
timeout_seconds: 1800
39+
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
{
2+
"streams": [
3+
{
4+
"stream": {
5+
"name": "customers_cart",
6+
"json_schema": {},
7+
"supported_sync_modes": ["full_refresh", "incremental"],
8+
"source_defined_cursor": true,
9+
"default_cursor_field": ["updated_at"]
10+
},
11+
"sync_mode": "incremental",
12+
"cursor_field": ["updated_at"],
13+
"destination_sync_mode": "append"
14+
},
15+
{
16+
"stream": {
17+
"name": "orders",
18+
"json_schema": {},
19+
"supported_sync_modes": ["full_refresh", "incremental"],
20+
"source_defined_cursor": true,
21+
"default_cursor_field": ["updated_at"]
22+
},
23+
"sync_mode": "incremental",
24+
"cursor_field": ["updated_at"],
25+
"destination_sync_mode": "append"
26+
},
27+
{
28+
"stream": {
29+
"name": "addresses",
30+
"json_schema": {},
31+
"supported_sync_modes": ["full_refresh", "incremental"],
32+
"source_defined_cursor": true,
33+
"default_cursor_field": ["updated_at"]
34+
},
35+
"sync_mode": "incremental",
36+
"cursor_field": ["updated_at"],
37+
"destination_sync_mode": "append"
38+
},
39+
{
40+
"stream": {
41+
"name": "order_payments",
42+
"json_schema": {},
43+
"supported_sync_modes": ["full_refresh", "incremental"],
44+
"source_defined_cursor": true,
45+
"default_cursor_field": ["updated_at"]
46+
},
47+
"sync_mode": "incremental",
48+
"cursor_field": ["updated_at"],
49+
"destination_sync_mode": "append"
50+
},
51+
{
52+
"stream": {
53+
"name": "order_items",
54+
"json_schema": {},
55+
"supported_sync_modes": ["full_refresh", "incremental"],
56+
"source_defined_cursor": true,
57+
"default_cursor_field": ["updated_at"]
58+
},
59+
"sync_mode": "incremental",
60+
"cursor_field": ["updated_at"],
61+
"destination_sync_mode": "append"
62+
},
63+
{
64+
"stream": {
65+
"name": "products",
66+
"json_schema": {},
67+
"supported_sync_modes": ["full_refresh", "incremental"],
68+
"source_defined_cursor": true,
69+
"default_cursor_field": ["updated_at"]
70+
},
71+
"sync_mode": "incremental",
72+
"cursor_field": ["updated_at"],
73+
"destination_sync_mode": "append"
74+
}
75+
]
76+
}
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
{
2-
"access_token": "1234567890absc",
3-
"start_date": "2021-01-01T00:00:00Z",
4-
"store_name": "www.my_pretyy_store.com"
2+
"start_date": "2022-01-01T00:00:00Z",
3+
"credentials": {
4+
"auth_type": "SINGLE_STORE_ACCESS_TOKEN",
5+
"store_name": "www.my_pretyy_store.com",
6+
"access_token": "1234567890absc"
7+
}
58
}

airbyte-integrations/connectors/source-cart/source_cart/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@
2222
SOFTWARE.
2323
"""
2424

25-
from .source import SourceCart
25+
from .source import CentralAPIHeaderAuthenticator, CustomHeaderAuthenticator, SourceCart
2626

27-
__all__ = ["SourceCart"]
27+
__all__ = ["SourceCart", "CentralAPIHeaderAuthenticator", "CustomHeaderAuthenticator"]

airbyte-integrations/connectors/source-cart/source_cart/source.py

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
import base64
6+
import codecs
7+
import hashlib
8+
import hmac
9+
import urllib.parse
10+
from enum import Enum
511
from functools import wraps
612
from typing import Any, List, Mapping, Tuple
713

@@ -17,13 +23,65 @@
1723
from .streams import Addresses, CustomersCart, OrderItems, OrderPayments, Orders, OrderStatuses, Products
1824

1925

26+
class AuthMethod(Enum):
27+
CENTRAL_API_ROUTER = 1
28+
SINGLE_STORE_ACCESS_TOKEN = 2
29+
30+
2031
class CustomHeaderAuthenticator(HttpAuthenticator):
21-
def __init__(self, access_token):
32+
def __init__(self, access_token, store_name):
33+
self.auth_method = AuthMethod.SINGLE_STORE_ACCESS_TOKEN
34+
self._store_name = store_name
2235
self._access_token = access_token
2336

2437
def get_auth_header(self) -> Mapping[str, Any]:
2538
return {"X-AC-Auth-Token": self._access_token}
2639

40+
def url_base(self) -> str:
41+
return f"https://{self._store_name}/api/v1/"
42+
43+
def extra_params(self, stream, params):
44+
return {}
45+
46+
47+
class CentralAPIHeaderAuthenticator(HttpAuthenticator):
48+
def __init__(self, user_name, user_secret, site_id):
49+
self.auth_method = AuthMethod.CENTRAL_API_ROUTER
50+
self.user_name = user_name
51+
self.user_secret = user_secret
52+
self.site_id = site_id
53+
54+
def get_auth_header(self) -> Mapping[str, Any]:
55+
"""
56+
This method is not implemented here because for the Central API Router
57+
needs to build the header for each request based
58+
on path + parameters (next token, pagination, page size)
59+
To solve this the logic was moved to `request_headers` in CartStream class.
60+
"""
61+
return {}
62+
63+
def url_base(self) -> str:
64+
return "https://public.americommerce.com/api/v1/"
65+
66+
def extra_params(self, stream, params):
67+
return self.generate_auth_signature(stream, params)
68+
69+
def generate_auth_signature(self, stream, params) -> Mapping[str, Any]:
70+
"""
71+
How to build signature:
72+
1. build a string concatenated with:
73+
request method (uppercase) & request path and query & provisioning user name
74+
example: GET&/api/v1/customers&myUser
75+
2. Generate HMACSHA256 hash using this string as the input, and the provisioning user secret as the key
76+
3. Base64 this hash to be used as the final value in the header
77+
"""
78+
path_with_params = f"/api/v1/{stream.path()}?{urllib.parse.urlencode(params)}"
79+
msg = codecs.encode(f"GET&{path_with_params}&{self.user_name}")
80+
key = codecs.encode(self.user_secret)
81+
dig = hmac.new(key=key, msg=msg, digestmod=hashlib.sha256).digest()
82+
auth_signature = base64.b64encode(dig).decode()
83+
return {"X-AC-PUB-Site-ID": self.site_id, "X-AC-PUB-User": self.user_name, "X-AC-PUB-Auth-Signature": auth_signature}
84+
2785

2886
class SourceCart(AbstractSource):
2987
def validate_config_values(func):
@@ -48,12 +106,26 @@ def decorator(self_, *args, **kwargs):
48106

49107
return decorator
50108

109+
def get_auth(self, config):
110+
credentials = config.get("credentials", {})
111+
auth_method = credentials.get("auth_type")
112+
113+
if auth_method == AuthMethod.CENTRAL_API_ROUTER.name:
114+
authenticator = CentralAPIHeaderAuthenticator(
115+
user_name=credentials["user_name"], user_secret=credentials["user_secret"], site_id=credentials["site_id"]
116+
)
117+
elif auth_method == AuthMethod.SINGLE_STORE_ACCESS_TOKEN.name:
118+
authenticator = CustomHeaderAuthenticator(access_token=credentials["access_token"], store_name=credentials["store_name"])
119+
else:
120+
raise NotImplementedError(f"Authentication method: {auth_method} not implemented.")
121+
122+
return authenticator
123+
51124
@validate_config_values
52125
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
53126
try:
54-
authenticator = CustomHeaderAuthenticator(access_token=config["access_token"])
55-
56-
stream = Products(authenticator=authenticator, start_date=config["start_date"], store_name=config["store_name"])
127+
authenticator = self.get_auth(config)
128+
stream = Products(authenticator=authenticator, start_date=config["start_date"])
57129
records = stream.read_records(sync_mode=SyncMode.full_refresh)
58130
next(records)
59131
return True, None
@@ -67,11 +139,10 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
67139

68140
@validate_config_values
69141
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
70-
authenticator = CustomHeaderAuthenticator(access_token=config["access_token"])
142+
authenticator = self.get_auth(config)
71143
args = {
72144
"authenticator": authenticator,
73145
"start_date": config["start_date"],
74-
"store_name": config["store_name"],
75146
"end_date": config.get("end_date"),
76147
}
77148
return [

0 commit comments

Comments
 (0)