Skip to content

Commit eb6fa3b

Browse files
author
Baz
authored
🐛 🎉 Source PayPal Transaction: added OAuth2.0, fixed bug with normalization (#15000)
1 parent bbd6540 commit eb6fa3b

File tree

15 files changed

+353
-64
lines changed

15 files changed

+353
-64
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,11 +691,11 @@
691691
- name: Paypal Transaction
692692
sourceDefinitionId: d913b0f2-cc51-4e55-a44c-8ba1697b9239
693693
dockerRepository: airbyte/source-paypal-transaction
694-
dockerImageTag: 0.1.7
694+
dockerImageTag: 0.1.8
695695
documentationUrl: https://docs.airbyte.io/integrations/sources/paypal-transaction
696696
icon: paypal.svg
697697
sourceType: api
698-
releaseStage: alpha
698+
releaseStage: beta
699699
- name: Paystack
700700
sourceDefinitionId: 193bdcb8-1dd9-48d1-aade-91cadfd74f9b
701701
dockerRepository: airbyte/source-paystack

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6708,29 +6708,66 @@
67086708
supportsNormalization: false
67096709
supportsDBT: false
67106710
supported_destination_sync_modes: []
6711-
- dockerImage: "airbyte/source-paypal-transaction:0.1.7"
6711+
- dockerImage: "airbyte/source-paypal-transaction:0.1.8"
67126712
spec:
67136713
documentationUrl: "https://docs.airbyte.io/integrations/sources/paypal-transactions"
67146714
connectionSpecification:
67156715
$schema: "http://json-schema.org/draft-07/schema#"
67166716
title: "Paypal Transaction Search"
67176717
type: "object"
67186718
required:
6719-
- "client_id"
6720-
- "secret"
67216719
- "start_date"
67226720
- "is_sandbox"
67236721
additionalProperties: true
67246722
properties:
6725-
client_id:
6726-
title: "Client ID"
6727-
type: "string"
6728-
description: "The Client ID of your Paypal developer application."
6729-
secret:
6730-
title: "Client Secret"
6731-
type: "string"
6732-
description: "The Client Secret of your Paypal developer application."
6733-
airbyte_secret: true
6723+
credentials:
6724+
title: "Authenticate using"
6725+
type: "object"
6726+
oneOf:
6727+
- type: "object"
6728+
title: "OAuth2.0"
6729+
required:
6730+
- "auth_type"
6731+
- "refresh_token"
6732+
properties:
6733+
auth_type:
6734+
type: "string"
6735+
const: "oauth2.0"
6736+
client_id:
6737+
type: "string"
6738+
title: "Client ID"
6739+
description: "The Client ID of your Paypal developer application."
6740+
airbyte_secret: true
6741+
client_secret:
6742+
type: "string"
6743+
title: "Client secret"
6744+
description: "The Client Secret of your Paypal developer application."
6745+
airbyte_secret: true
6746+
refresh_token:
6747+
type: "string"
6748+
title: "Refresh token"
6749+
description: "The key to refresh the expired access token."
6750+
airbyte_secret: true
6751+
- title: "Private OAuth Creds"
6752+
type: "object"
6753+
required:
6754+
- "auth_type"
6755+
- "client_id"
6756+
- "client_secret"
6757+
properties:
6758+
auth_type:
6759+
type: "string"
6760+
const: "private_oauth"
6761+
client_id:
6762+
type: "string"
6763+
title: "Client ID"
6764+
description: "The Client ID of your Paypal developer application."
6765+
airbyte_secret: true
6766+
client_secret:
6767+
type: "string"
6768+
title: "Client secret"
6769+
description: "The Client Secret of your Paypal developer application."
6770+
airbyte_secret: true
67346771
start_date:
67356772
type: "string"
67366773
title: "Start Date"
@@ -6748,6 +6785,17 @@
67486785
supportsNormalization: false
67496786
supportsDBT: false
67506787
supported_destination_sync_modes: []
6788+
authSpecification:
6789+
auth_type: "oauth2.0"
6790+
oauth2Specification:
6791+
rootObject:
6792+
- "credentials"
6793+
- "0"
6794+
oauthFlowInitParameters:
6795+
- - "client_id"
6796+
- - "client_secret"
6797+
oauthFlowOutputParameters:
6798+
- - "refresh_token"
67516799
- dockerImage: "airbyte/source-paystack:0.1.1"
67526800
spec:
67536801
documentationUrl: "https://docs.airbyte.io/integrations/sources/paystack"

airbyte-integrations/connectors/source-paypal-transaction/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.1.7
15+
LABEL io.airbyte.version=0.1.8
1616
LABEL io.airbyte.name=airbyte/source-paypal-transaction

airbyte-integrations/connectors/source-paypal-transaction/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
7979
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
8080
First install test dependencies into your virtual environment:
8181
```
82-
pip install .[tests]
82+
pip install '.[tests]'
8383
```
8484
### Unit Tests
8585
To run unit tests locally, from the connector directory run:

airbyte-integrations/connectors/source-paypal-transaction/acceptance-test-config.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ tests:
99
status: "succeed"
1010
- config_path: "integration_tests/invalid_config.json"
1111
status: "failed"
12+
- config_path: "secrets/config_oauth.json"
13+
status: "succeed"
14+
- config_path: "integration_tests/invalid_config_oauth.json"
15+
status: "failed"
1216
discovery:
1317
- config_path: "secrets/config.json"
1418
basic_read:

airbyte-integrations/connectors/source-paypal-transaction/integration_tests/configured_catalog.json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
"name": "transactions",
66
"json_schema": {},
77
"source_defined_cursor": true,
8-
"default_cursor_field": [
9-
"transaction_info",
10-
"transaction_initiation_date"
11-
],
8+
"default_cursor_field": ["transaction_initiation_date"],
129
"supported_sync_modes": ["full_refresh", "incremental"]
1310
},
1411
"sync_mode": "incremental",

airbyte-integrations/connectors/source-paypal-transaction/integration_tests/configured_catalog_transactions.json

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
"name": "transactions",
66
"json_schema": {},
77
"source_defined_cursor": true,
8-
"default_cursor_field": [
9-
"transaction_info",
10-
"transaction_initiation_date"
11-
],
8+
"default_cursor_field": ["transaction_initiation_date"],
129
"supported_sync_modes": ["full_refresh", "incremental"]
1310
},
1411
"sync_mode": "incremental",
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"credentials": {
3+
"auth_type": "oauth2.0",
4+
"client_id": "AWA__",
5+
"secret": "ENC__",
6+
"refresh_token": "__"
7+
},
8+
"start_date": "2021-07-03T00:00:00+00:00",
9+
"end_date": "2021-07-04T23:59:59+00:00",
10+
"is_sandbox": false
11+
}

airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/schemas/transactions.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@
129129
}
130130
}
131131
},
132+
"transaction_id": {
133+
"type": ["null", "string"],
134+
"maxLength": 24
135+
},
136+
"transaction_initiation_date": {
137+
"type": ["null", "string"],
138+
"format": "date-time"
139+
},
132140
"payer_info": {
133141
"type": ["null", "object"],
134142
"properties": {

airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/source.py

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
import base64
56
import json
67
import logging
78
import time
@@ -51,10 +52,9 @@ def __repr__(self):
5152

5253
def get_endpoint(is_sandbox: bool = False) -> str:
5354
if is_sandbox:
54-
endpoint = "https://api-m.sandbox.paypal.com"
55-
else:
56-
endpoint = "https://api-m.paypal.com"
57-
return endpoint
55+
return "https://api-m.sandbox.paypal.com"
56+
57+
return "https://api-m.paypal.com"
5858

5959

6060
class PaypalTransactionStream(HttpStream, ABC):
@@ -82,6 +82,10 @@ class PaypalTransactionStream(HttpStream, ABC):
8282
stream_slice_period: Mapping[str, int] = {"days": 15} # max period is 31 days (API limit)
8383

8484
requests_per_minute: int = 30 # API limit is 50 reqs/min from 1 IP to all endpoints, otherwise IP is banned for 5 mins
85+
# if the stream has nested cursor_field, we should trry to unnest it once parsing the recods to avoid normalization conflicts.
86+
unnest_cursor: bool = False
87+
unnest_pk: bool = False
88+
nested_object: str = None
8589

8690
def __init__(
8791
self,
@@ -156,11 +160,25 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
156160
# In order to support direct datetime string comparison (which is performed in incremental acceptance tests)
157161
# convert any date format to python iso format string for date based cursors
158162
self.update_field(record, self.cursor_field, lambda date: isoparse(date).isoformat())
163+
# unnest cursor_field to handle normalization correctly
164+
if self.unnest_cursor:
165+
self.unnest_field(record, self.nested_object, self.cursor_field)
166+
# unnest primary_key to handle normalization correctly
167+
if self.unnest_pk:
168+
self.unnest_field(record, self.nested_object, self.primary_key)
159169
yield record
160170

161171
# sleep for 1-2 secs to not reach rate limit: 50 requests per minute
162172
time.sleep(60 / self.requests_per_minute)
163173

174+
@staticmethod
175+
def unnest_field(record: Mapping[str, Any], unnest_from: Dict, cursor_field: str):
176+
"""
177+
Unnest cursor_field to the root level of the record.
178+
"""
179+
if unnest_from in record:
180+
record[cursor_field] = record.get(unnest_from).get(cursor_field)
181+
164182
@staticmethod
165183
def update_field(record: Mapping[str, Any], field_path: Union[List[str], str], update: Callable[[Any], None]):
166184
if not isinstance(field_path, List):
@@ -332,8 +350,14 @@ class Transactions(PaypalTransactionStream):
332350
"""
333351

334352
data_field = "transaction_details"
335-
primary_key = [["transaction_info", "transaction_id"]]
336-
cursor_field = ["transaction_info", "transaction_initiation_date"]
353+
nested_object = "transaction_info"
354+
355+
primary_key = "transaction_id"
356+
cursor_field = "transaction_initiation_date"
357+
358+
unnest_cursor = True
359+
unnest_pk = True
360+
337361
transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization)
338362

339363
# TODO handle API error when 1 request returns more than 10000 records.
@@ -402,39 +426,67 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
402426

403427
class PayPalOauth2Authenticator(Oauth2Authenticator):
404428
"""Request example for API token extraction:
405-
curl -v POST https://api-m.sandbox.paypal.com/v1/oauth2/token \
406-
-H "Accept: application/json" \
407-
-H "Accept-Language: en_US" \
408-
-u "CLIENT_ID:SECRET" \
409-
-d "grant_type=client_credentials"
429+
For `old_config` scenario:
430+
curl -v POST https://api-m.sandbox.paypal.com/v1/oauth2/token \
431+
-H "Accept: application/json" \
432+
-H "Accept-Language: en_US" \
433+
-u "CLIENT_ID:SECRET" \
434+
-d "grant_type=client_credentials"
410435
"""
411436

412-
def __init__(self, config):
413-
super().__init__(
414-
token_refresh_endpoint=f"{get_endpoint(config['is_sandbox'])}/v1/oauth2/token",
415-
client_id=config["client_id"],
416-
client_secret=config["secret"],
417-
refresh_token="",
418-
)
437+
def __init__(self, config: Dict):
438+
self.old_config: bool = False
439+
# default auth args
440+
self.auth_args: Dict = {
441+
"token_refresh_endpoint": f"{get_endpoint(config['is_sandbox'])}/v1/oauth2/token",
442+
"refresh_token": "",
443+
}
444+
# support old configs
445+
if "client_id" and "secret" in config.keys():
446+
self.old_config = True
447+
self.auth_args.update(**{"client_id": config["client_id"], "client_secret": config["secret"]})
448+
# new configs
449+
if "credentials" in config.keys():
450+
credentials = config.get("credentials")
451+
auth_type = credentials.get("auth_type")
452+
self.auth_args.update(**{"client_id": credentials["client_id"], "client_secret": credentials["client_secret"]})
453+
if auth_type == "oauth2.0":
454+
self.auth_args["refresh_token"] = credentials["refresh_token"]
455+
elif auth_type == "private_oauth":
456+
self.old_config = True
457+
458+
self.config = config
459+
super().__init__(**self.auth_args)
460+
461+
def get_headers(self):
462+
# support old configs
463+
if self.old_config:
464+
return {"Accept": "application/json", "Accept-Language": "en_US"}
465+
# new configs
466+
basic_auth = base64.b64encode(bytes(f"{self.client_id}:{self.client_secret}", "utf-8")).decode("utf-8")
467+
return {"Authorization": f"Basic {basic_auth}"}
419468

420469
def get_refresh_request_body(self) -> Mapping[str, Any]:
421-
return {"grant_type": "client_credentials"}
470+
# support old configs
471+
if self.old_config:
472+
return {"grant_type": "client_credentials"}
473+
# new configs
474+
return {"grant_type": "refresh_token", "refresh_token": self.refresh_token}
422475

423476
def refresh_access_token(self) -> Tuple[str, int]:
424477
"""
425478
returns a tuple of (access_token, token_lifespan_in_seconds)
426479
"""
480+
request_args = {
481+
"url": self.token_refresh_endpoint,
482+
"data": self.get_refresh_request_body(),
483+
"headers": self.get_headers(),
484+
}
427485
try:
428-
data = "grant_type=client_credentials"
429-
headers = {"Accept": "application/json", "Accept-Language": "en_US"}
430-
auth = (self.client_id, self.client_secret)
431-
response = requests.request(
432-
method="POST",
433-
url=self.token_refresh_endpoint,
434-
data=data,
435-
headers=headers,
436-
auth=auth,
437-
)
486+
# support old configs
487+
if self.old_config:
488+
request_args["auth"] = (self.client_id, self.client_secret)
489+
response = requests.post(**request_args)
438490
response.raise_for_status()
439491
response_json = response.json()
440492
return response_json["access_token"], response_json["expires_in"]

0 commit comments

Comments
 (0)