Skip to content

Source Hubspot - Log instructions to update scopes when hitting 403 HTTP error #12515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 3, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple

from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.deprecated.base_source import ConfiguredAirbyteStream
from airbyte_cdk.sources.streams import Stream
Expand Down Expand Up @@ -48,17 +48,14 @@
class SourceHubspot(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""Check connection"""
alive = True
error_msg = None
common_params = self.get_common_params(config=config)
try:
contacts = Contacts(**common_params)
_ = contacts.properties
for stream in self.streams(config=config):
next(stream.read_records(sync_mode=SyncMode.full_refresh))
except HTTPError as error:
alive = False
error_msg = repr(error)

return alive, error_msg
return False, error_msg
return True, None

@staticmethod
def get_api(config: Mapping[str, Any]) -> API:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,32 +331,41 @@ def read_records(
pagination_complete = False

next_page_token = None
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:

properties_list = list(self.properties.keys())
if properties_list:
stream_records, response = self._read_stream_records(
properties_list=properties_list,
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
records = [value for key, value in stream_records.items()]
else:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)
yield from records

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []
try:
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:

properties_list = list(self.properties.keys())
if properties_list:
stream_records, response = self._read_stream_records(
properties_list=properties_list,
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
records = [value for key, value in stream_records.items()]
else:
response = self.handle_request(
stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token
)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)
yield from records

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 403:
raise RuntimeError("Invalid permissions. Please ensure the all scopes are authorized for.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should say the stream name that broke here just so the user has actionable feedback

else:
raise e

@staticmethod
def _convert_datetime_to_string(dt: pendulum.datetime, declared_format: str = None) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_check_connection_ok(requests_mock, config):
{"json": [], "status_code": 200},
]

requests_mock.register_uri("GET", "/properties/v2/contact/properties", responses)
requests_mock.register_uri(test_check_connection_ok, test_check_connection_ok, responses)
ok, error_msg = SourceHubspot().check_connection(logger, config=config)

assert ok
Expand Down Expand Up @@ -119,7 +119,7 @@ def test_check_connection_backoff_on_limit_reached(requests_mock, config):
{"json": [], "status_code": 200},
]

requests_mock.register_uri("GET", "/properties/v2/contact/properties", responses)
requests_mock.register_uri(requests_mock.ANY, requests_mock.ANY, responses)
source = SourceHubspot()
alive, error = source.check_connection(logger=logger, config=config)

Expand All @@ -133,7 +133,8 @@ def test_check_connection_backoff_on_server_error(requests_mock, config):
{"json": {"error": "something bad"}, "status_code": 500},
{"json": [], "status_code": 200},
]
requests_mock.register_uri("GET", "/properties/v2/contact/properties", responses)
requests_mock.register_uri("GET", requests_mock.ANY, responses)
requests_mock.register_uri("POST", requests_mock.ANY, responses)
source = SourceHubspot()
alive, error = source.check_connection(logger=logger, config=config)

Expand Down Expand Up @@ -360,7 +361,7 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
"results": [{"id": f"{y}", "updatedAt": "2022-02-25T16:43:11Z"} for y in range(100)],
"paging": {
"next": {
"after": f"{x*100}",
"after": f"{x * 100}",
}
},
},
Expand All @@ -376,7 +377,7 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
"results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)],
"paging": {
"next": {
"after": f"{x*100}",
"after": f"{x * 100}",
}
},
},
Expand Down