Skip to content

Commit 09d6901

Browse files
Source Hubspot: fix 401 for associations (#15156)
* Revert "Source Hubspot: revert v0.1.78 (#15144)" This reverts commit cbdb897. * #379 source hubspot: fix 401 when reading associations * #379 source hubspot: fix 401 when reading associations * #379 source hubspot: upd changelog * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 9dd045d commit 09d6901

File tree

9 files changed

+161
-18
lines changed

9 files changed

+161
-18
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@
414414
- name: HubSpot
415415
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
416416
dockerRepository: airbyte/source-hubspot
417-
dockerImageTag: 0.1.79
417+
dockerImageTag: 0.1.80
418418
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
419419
icon: hubspot.svg
420420
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3705,7 +3705,7 @@
37053705
supportsNormalization: false
37063706
supportsDBT: false
37073707
supported_destination_sync_modes: []
3708-
- dockerImage: "airbyte/source-hubspot:0.1.79"
3708+
- dockerImage: "airbyte/source-hubspot:0.1.80"
37093709
spec:
37103710
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
37113711
connectionSpecification:

airbyte-integrations/connectors/source-hubspot/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
3434
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3535
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3636

37-
LABEL io.airbyte.version=0.1.79
37+
LABEL io.airbyte.version=0.1.80
3838
LABEL io.airbyte.name=airbyte/source-hubspot
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import json
6+
7+
import pytest
8+
9+
10+
@pytest.fixture(scope="session", name="config")
11+
def config_fixture():
12+
with open("secrets/config.json", "r") as config_file:
13+
return json.load(config_file)
14+
15+
16+
@pytest.fixture(scope="session", name="oauth_config")
17+
def oauth_config_fixture():
18+
with open("secrets/config_oauth.json", "r") as config_file:
19+
return json.load(config_file)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import logging
6+
7+
import pytest
8+
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
9+
from source_hubspot.source import SourceHubspot
10+
11+
12+
@pytest.fixture
13+
def source():
14+
return SourceHubspot()
15+
16+
17+
@pytest.fixture
18+
def associations(config, source):
19+
streams = source.streams(config)
20+
return {stream.name: getattr(stream, "associations", []) for stream in streams}
21+
22+
23+
@pytest.fixture
24+
def configured_catalog(config, source):
25+
streams = source.streams(config)
26+
return {
27+
"streams": [
28+
{
29+
"stream": stream.as_airbyte_stream(),
30+
"sync_mode": "incremental",
31+
"cursor_field": [stream.cursor_field],
32+
"destination_sync_mode": "append",
33+
}
34+
for stream in streams
35+
if stream.supports_incremental and getattr(stream, "associations", [])
36+
]
37+
}
38+
39+
40+
@pytest.mark.parametrize("auth", ("api_key", "oauth"))
41+
def test_incremental_read_fetches_associations(auth, config, oauth_config, configured_catalog, source, associations):
42+
configuration = oauth_config if auth == "oauth" else config
43+
messages = source.read(logging.getLogger("airbyte"), configuration, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {})
44+
45+
association_found = False
46+
for message in messages:
47+
if message and message.type != Type.RECORD:
48+
continue
49+
record = message.record
50+
stream, data = record.stream, record.data
51+
# assume at least one association id is present
52+
stream_associations = associations[stream]
53+
for association in stream_associations:
54+
if data.get(association):
55+
association_found = True
56+
break
57+
assert association_found

airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from airbyte_cdk.utils.event_timing import create_timer
1616
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1717
from requests import HTTPError
18-
from source_hubspot.constants import API_KEY_CREDENTIALS, OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS
18+
from source_hubspot.constants import API_KEY_CREDENTIALS
1919
from source_hubspot.streams import (
2020
API,
2121
Campaigns,
@@ -82,15 +82,10 @@ def get_api(config: Mapping[str, Any]) -> API:
8282
return API(credentials=credentials)
8383

8484
def get_common_params(self, config) -> Mapping[str, Any]:
85-
start_date = config.get("start_date")
85+
start_date = config["start_date"]
8686
credentials = config["credentials"]
8787
api = self.get_api(config=config)
88-
common_params = dict(api=api, start_date=start_date, credentials=credentials)
89-
90-
credentials_title = credentials.get("credentials_title")
91-
if credentials_title == OAUTH_CREDENTIALS or credentials_title == PRIVATE_APP_CREDENTIALS:
92-
common_params["authenticator"] = api.get_authenticator()
93-
return common_params
88+
return dict(api=api, start_date=start_date, credentials=credentials)
9489

9590
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
9691
credentials = config.get("credentials", {})

airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,19 @@ def _property_wrapper(self) -> IURLPropertyRepresentation:
245245
return APIv1Property(properties)
246246
return APIv3Property(properties)
247247

248-
def __init__(self, api: API, start_date: str = None, credentials: Mapping[str, Any] = None, **kwargs):
248+
def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credentials: Mapping[str, Any] = None, **kwargs):
249249
super().__init__(**kwargs)
250250
self._api: API = api
251-
self._start_date = pendulum.parse(start_date)
251+
self._credentials = credentials
252252

253-
if credentials["credentials_title"] == API_KEY_CREDENTIALS:
253+
self._start_date = start_date
254+
if isinstance(self._start_date, str):
255+
self._start_date = pendulum.parse(self._start_date)
256+
creds_title = self._credentials["credentials_title"]
257+
if creds_title == API_KEY_CREDENTIALS:
254258
self._session.params["hapikey"] = credentials.get("api_key")
259+
elif creds_title in (OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS):
260+
self._authenticator = api.get_authenticator()
255261

256262
def backoff_time(self, response: requests.Response) -> Optional[float]:
257263
if response.status_code == codes.too_many_requests:
@@ -642,6 +648,51 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
642648
yield record
643649

644650

651+
class AssociationsStream(Stream):
652+
"""
653+
Designed to read associations of CRM objects during incremental syncs, since Search API does not support
654+
retrieving associations.
655+
"""
656+
657+
http_method = "POST"
658+
filter_old_records = False
659+
660+
def __init__(self, parent_stream: Stream, identifiers: Iterable[Union[int, str]], *args, **kwargs):
661+
super().__init__(*args, **kwargs)
662+
self.parent_stream = parent_stream
663+
self.identifiers = identifiers
664+
665+
@property
666+
def url(self):
667+
"""
668+
although it is not used, it needs to be implemented because it is an abstract property
669+
"""
670+
return ""
671+
672+
def path(
673+
self,
674+
*,
675+
stream_state: Mapping[str, Any] = None,
676+
stream_slice: Mapping[str, Any] = None,
677+
next_page_token: Mapping[str, Any] = None,
678+
) -> str:
679+
return f"/crm/v4/associations/{self.parent_stream.entity}/{stream_slice}/batch/read"
680+
681+
def scopes(self) -> Set[str]:
682+
return self.parent_stream.scopes
683+
684+
def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[str]:
685+
return self.parent_stream.associations
686+
687+
def request_body_json(
688+
self,
689+
stream_state: Mapping[str, Any],
690+
stream_slice: Mapping[str, Any] = None,
691+
next_page_token: Mapping[str, Any] = None,
692+
) -> Optional[Mapping]:
693+
return {"inputs": [{"id": str(id_)} for id_ in self.identifiers]}
694+
695+
645696
class IncrementalStream(Stream, ABC):
646697
"""Stream that supports state and incremental read"""
647698

@@ -807,6 +858,24 @@ def _process_search(
807858

808859
return list(stream_records.values()), raw_response
809860

861+
def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]:
862+
records_by_pk = {record[self.primary_key]: record for record in records}
863+
identifiers = list(map(lambda x: x[self.primary_key], records))
864+
associations_stream = AssociationsStream(
865+
api=self._api, start_date=self._start_date, credentials=self._credentials, parent_stream=self, identifiers=identifiers
866+
)
867+
slices = associations_stream.stream_slices(sync_mode=SyncMode.full_refresh)
868+
869+
for _slice in slices:
870+
logger.info(f"Reading {_slice} associations of {self.entity}")
871+
associations = associations_stream.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
872+
for group in associations:
873+
current_record = records_by_pk[group["from"]["id"]]
874+
associations_list = current_record.get(_slice, [])
875+
associations_list.extend(association["toObjectId"] for association in group["to"])
876+
current_record[_slice] = associations_list
877+
return records_by_pk.values()
878+
810879
def read_records(
811880
self,
812881
sync_mode: SyncMode,
@@ -826,15 +895,15 @@ def read_records(
826895
stream_state=stream_state,
827896
stream_slice=stream_slice,
828897
)
829-
898+
records = self._read_associations(records)
830899
else:
831900
records, raw_response = self._read_stream_records(
832901
stream_slice=stream_slice,
833902
stream_state=stream_state,
834903
next_page_token=next_page_token,
835904
)
905+
records = self._flat_associations(records)
836906
records = self._filter_old_records(records)
837-
records = self._flat_associations(records)
838907

839908
for record in records:
840909
cursor = self._field_to_datetime(record[self.updated_at_field])

airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_check_connection_empty_config(config):
5050
def test_check_connection_invalid_config(config):
5151
config.pop("start_date")
5252

53-
with pytest.raises(TypeError):
53+
with pytest.raises(KeyError):
5454
SourceHubspot().check_connection(logger, config=config)
5555

5656

@@ -406,6 +406,8 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
406406
requests_mock.register_uri("POST", test_stream.url, responses)
407407
test_stream._sync_mode = None
408408
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
409+
requests_mock.register_uri("POST", "/crm/v4/associations/company/contacts/batch/read", [{"status_code": 200, "json": {"results": []}}])
410+
409411
records, _ = read_incremental(test_stream, {})
410412
# The stream should not attempt to get more than 10K records.
411413
# Instead, it should use the new state to start a new search query.

docs/integrations/sources/hubspot.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ Now that you have set up the HubSpot source connector, check out the following H
129129

130130
| Version | Date | Pull Request | Subject |
131131
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
132-
| 0.1.79 | 2022-07-28 | [15144](https://github.com/airbytehq/airbyte/pull/15144) | Revert v0.1.78 due to permission issues |
132+
| 0.1.80 | 2022-08-01 | [15156](https://github.com/airbytehq/airbyte/pull/15156) | Fix 401 error while retrieving associations using OAuth |
133+
| 0.1.79 | 2022-07-28 | [15144](https://github.com/airbytehq/airbyte/pull/15144) | Revert v0.1.78 due to permission issues |
133134
| 0.1.78 | 2022-07-28 | [15099](https://github.com/airbytehq/airbyte/pull/15099) | Fix to fetch associations when using incremental mode |
134135
| 0.1.77 | 2022-07-26 | [15035](https://github.com/airbytehq/airbyte/pull/15035) | Make PropertyHistory stream read historic data not limited to 30 days |
135136
| 0.1.76 | 2022-07-25 | [14999](https://github.com/airbytehq/airbyte/pull/14999) | Partially revert changes made in v0.1.75 |

0 commit comments

Comments
 (0)