Skip to content

Commit 5f55f25

Browse files
author
Baz
authored
🐛 Source Netsuite: fix early adopter issues (#19798)
1 parent f5b793d commit 5f55f25

File tree

10 files changed

+168
-92
lines changed

10 files changed

+168
-92
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,7 @@
10291029
- name: Netsuite
10301030
sourceDefinitionId: 4f2f093d-ce44-4121-8118-9d13b7bfccd0
10311031
dockerRepository: airbyte/source-netsuite
1032-
dockerImageTag: 0.1.1
1032+
dockerImageTag: 0.1.2
10331033
documentationUrl: https://docs.airbyte.com/integrations/sources/netsuite
10341034
sourceType: api
10351035
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9205,7 +9205,7 @@
92059205
supportsNormalization: false
92069206
supportsDBT: false
92079207
supported_destination_sync_modes: []
9208-
- dockerImage: "airbyte/source-netsuite:0.1.1"
9208+
- dockerImage: "airbyte/source-netsuite:0.1.2"
92099209
spec:
92109210
documentationUrl: "https://docsurl.com"
92119211
connectionSpecification:

airbyte-integrations/connectors/source-netsuite/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ COPY source_netsuite ./source_netsuite
3535
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3636
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3737

38-
LABEL io.airbyte.version=0.1.1
38+
LABEL io.airbyte.version=0.1.2
3939
LABEL io.airbyte.name=airbyte/source-netsuite

airbyte-integrations/connectors/source-netsuite/acceptance-test-config.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ tests:
1010
- config_path: "sample_files/invalid_config.json"
1111
status: "failed"
1212
discovery:
13-
# Discovery stage is dynamic, so timeout iscreased
1413
- config_path: "secrets/config.json"
14+
# Discovery stage is dynamic, so timeout iscreased
1515
timeout_seconds: 1200
1616
basic_read:
1717
- config_path: "secrets/config.json"
@@ -33,4 +33,5 @@ tests:
3333
- config_path: "secrets/config.json"
3434
configured_catalog_path: "integration_tests/configured_catalog.json"
3535
future_state_path: "integration_tests/abnormal_state.json"
36-
timeout_seconds: 3600
36+
timeout_seconds: 7200
37+
threshold_days: 30

airbyte-integrations/connectors/source-netsuite/integration_tests/configured_catalog.json

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,24 @@
22
"streams": [
33
{
44
"stream": {
5-
"name": "customrecord01",
5+
"name": "customer",
66
"json_schema": {},
7-
"supported_sync_modes": ["full_refresh"]
7+
"supported_sync_modes": ["full_refresh", "incremental"]
88
},
9+
"source_defined_cursor": true,
10+
"default_cursor_field": ["lastModifiedDate"],
911
"source_defined_primary_key": [["id"]],
10-
"sync_mode": "full_refresh",
12+
"sync_mode": "incremental",
1113
"destination_sync_mode": "append"
1214
},
1315
{
1416
"stream": {
15-
"name": "customer",
17+
"name": "customrecord01",
1618
"json_schema": {},
17-
"supported_sync_modes": ["full_refresh", "incremental"]
19+
"supported_sync_modes": ["full_refresh"]
1820
},
19-
"source_defined_cursor": true,
20-
"default_cursor_field": ["lastModifiedDate"],
2121
"source_defined_primary_key": [["id"]],
22-
"sync_mode": "incremental",
22+
"sync_mode": "full_refresh",
2323
"destination_sync_mode": "append"
2424
},
2525
{
@@ -94,18 +94,6 @@
9494
"sync_mode": "incremental",
9595
"destination_sync_mode": "append"
9696
},
97-
{
98-
"stream": {
99-
"name": "task",
100-
"json_schema": {},
101-
"supported_sync_modes": ["full_refresh", "incremental"]
102-
},
103-
"source_defined_cursor": true,
104-
"default_cursor_field": ["lastModifiedDate"],
105-
"source_defined_primary_key": [["id"]],
106-
"sync_mode": "incremental",
107-
"destination_sync_mode": "append"
108-
},
10997
{
11098
"stream": {
11199
"name": "salesorder",

airbyte-integrations/connectors/source-netsuite/setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk~=0.1",
10-
"requests-oauthlib~=1.3",
9+
"airbyte-cdk",
10+
"requests-oauthlib",
1111
]
1212

1313
TEST_REQUIREMENTS = [

airbyte-integrations/connectors/source-netsuite/source_netsuite/constraints.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@
3838
INCREMENTAL_CURSOR: str = "lastModifiedDate"
3939
CUSTOM_INCREMENTAL_CURSOR: str = "lastmodified"
4040

41-
# NETSUITE ERROR CODES BY THEIR HTTP TWINS
42-
NETSUITE_ERRORS_MAPPING: dict = {
43-
400: {
44-
"USER_ERROR": "reading an Admin record allowed for Admin only",
45-
"NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object",
46-
"INVALID_PARAMETER": "cannot read or find the object. Skipping",
47-
},
48-
}
41+
42+
NETSUITE_INPUT_DATE_FORMATS: list[str] = ["%m/%d/%Y", "%Y-%m-%d"]
43+
NETSUITE_OUTPUT_DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
# NETSUITE ERROR CODES BY THEIR HTTP TWINS
7+
NETSUITE_ERRORS_MAPPING: dict = {
8+
400: {
9+
"USER_ERROR": "reading an Admin record allowed for Admin only",
10+
"NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object",
11+
"INVALID_PARAMETER": "cannot read or find the object. Skipping",
12+
},
13+
403: {
14+
"INSUFFICIENT_PERMISSION": "not enough permissions to access the object",
15+
},
16+
}
17+
18+
19+
# NETSUITE API ERRORS EXCEPTIONS
20+
class DateFormatExeption(Exception):
21+
"""API CANNOT HANDLE REQUEST USING GIVEN DATETIME FORMAT"""

airbyte-integrations/connectors/source-netsuite/source_netsuite/source.py

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import logging
77
from collections import Counter
8+
from json import JSONDecodeError
89
from typing import Any, List, Mapping, Tuple, Union
910

1011
import requests
@@ -16,6 +17,9 @@
1617

1718

1819
class SourceNetsuite(AbstractSource):
20+
21+
logger: logging.Logger = logging.getLogger("airbyte")
22+
1923
def auth(self, config: Mapping[str, Any]) -> OAuth1:
2024
return OAuth1(
2125
client_key=config["consumer_key"],
@@ -50,7 +54,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
5054
# check connectivity to all provided `object_types`
5155
for object in object_types:
5256
try:
53-
response = session.get(url=base_url + RECORD_PATH + object, params={"limit": 1})
57+
response = session.get(url=base_url + RECORD_PATH + object.lower(), params={"limit": 1})
5458
response.raise_for_status()
5559
return True, None
5660
except requests.exceptions.HTTPError as e:
@@ -67,11 +71,29 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
6771
return False, e
6872

6973
def get_schemas(self, object_names: Union[List[str], str], session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
70-
# fetch schemas
71-
if isinstance(object_names, list):
72-
return {object_name: session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json() for object_name in object_names}
73-
elif isinstance(object_names, str):
74-
return {object_names: session.get(metadata_url + object_names, headers=SCHEMA_HEADERS).json()}
74+
"""
75+
Handles multivariance of object_names type input and fetches the schema for each object type provided.
76+
"""
77+
try:
78+
if isinstance(object_names, list):
79+
schemas = {}
80+
for object_name in object_names:
81+
schemas.update(**self.fetch_schema(object_name, session, metadata_url))
82+
return schemas
83+
elif isinstance(object_names, str):
84+
return self.fetch_schema(object_names, session, metadata_url)
85+
else:
86+
raise NotImplementedError(
87+
f"Object Types has unknown structure, should be either `dict` or `str`, actual input: {object_names}"
88+
)
89+
except JSONDecodeError as e:
90+
self.logger.error(f"Unexpected output while fetching the object schema. Full error: {e.__repr__()}")
91+
92+
def fetch_schema(self, object_name: str, session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
93+
"""
94+
Calls the API for specific object type and returns schema as a dict.
95+
"""
96+
return {object_name.lower(): session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json()}
7597

7698
def generate_stream(
7799
self,
@@ -83,35 +105,40 @@ def generate_stream(
83105
base_url: str,
84106
start_datetime: str,
85107
window_in_days: int,
108+
max_retry: int = 3,
86109
) -> Union[NetsuiteStream, IncrementalNetsuiteStream, CustomIncrementalNetsuiteStream]:
87110

88-
logger: logging.Logger = (logging.Logger,)
89-
90111
input_args = {
91112
"auth": auth,
92113
"object_name": object_name,
93114
"base_url": base_url,
94115
"start_datetime": start_datetime,
95116
"window_in_days": window_in_days,
96117
}
97-
try:
98-
schema = schemas[object_name]
99-
schema_props = schema["properties"]
100-
if schema_props:
101-
if INCREMENTAL_CURSOR in schema_props.keys():
102-
return IncrementalNetsuiteStream(**input_args)
103-
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
104-
return CustomIncrementalNetsuiteStream(**input_args)
105-
else:
106-
# all other streams are full_refresh
107-
return NetsuiteStream(**input_args)
108-
except KeyError:
109-
logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry...")
110-
# somethimes object metadata returns data with missing `properties` key,
111-
# we should try to fetch metadata again to that object
112-
schemas = self.get_schemas(object_name, session, metadata_url)
113-
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
114-
return self.generate_stream(**input_args)
118+
119+
schema = schemas[object_name]
120+
schema_props = schema.get("properties")
121+
if schema_props:
122+
if INCREMENTAL_CURSOR in schema_props.keys():
123+
return IncrementalNetsuiteStream(**input_args)
124+
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
125+
return CustomIncrementalNetsuiteStream(**input_args)
126+
else:
127+
# all other streams are full_refresh
128+
return NetsuiteStream(**input_args)
129+
else:
130+
retry_attempt = 1
131+
while retry_attempt <= max_retry:
132+
self.logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry attempt: {retry_attempt}/{max_retry}")
133+
# somethimes object metadata returns data with missing `properties` key,
134+
# we should try to fetch metadata again to that object
135+
schemas = self.get_schemas(object_name, session, metadata_url)
136+
if schemas[object_name].get("properties"):
137+
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
138+
return self.generate_stream(**input_args)
139+
retry_attempt += 1
140+
self.logger.warn(f"Object `{object_name}` schema is not available. Skipping this stream.")
141+
return None
115142

116143
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
117144
auth = self.auth(config)
@@ -121,15 +148,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
121148
object_names = config.get("object_types")
122149

123150
# retrieve all record types if `object_types` config field is not specified
124-
if not config.get("object_types"):
151+
if not object_names:
125152
objects_metadata = session.get(metadata_url).json().get("items")
126153
object_names = [object["name"] for object in objects_metadata]
127154

128155
input_args = {"session": session, "metadata_url": metadata_url}
129156
schemas = self.get_schemas(object_names, **input_args)
130157
input_args.update(
131158
**{
132-
"auth": self.auth(config),
159+
"auth": auth,
133160
"base_url": base_url,
134161
"start_datetime": config["start_datetime"],
135162
"window_in_days": config["window_in_days"],
@@ -139,6 +166,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
139166
# build streams
140167
streams: list = []
141168
for name in object_names:
142-
streams.append(self.generate_stream(object_name=name, **input_args))
143-
169+
stream = self.generate_stream(object_name=name.lower(), **input_args)
170+
if stream:
171+
streams.append(stream)
144172
return streams

0 commit comments

Comments
 (0)