Skip to content

Commit 1855408

Browse files
authored
[ISSUE #7674] contacts_list_memberships incremental using client filt… (#38128)
1 parent 232b102 commit 1855408

File tree

8 files changed

+293
-5
lines changed

8 files changed

+293
-5
lines changed

airbyte-integrations/connectors/source-hubspot/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
13-
dockerImageTag: 4.1.2
13+
dockerImageTag: 4.1.3
1414
dockerRepository: airbyte/source-hubspot
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
1616
githubIssueLabel: source-hubspot

airbyte-integrations/connectors/source-hubspot/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.1.2"
6+
version = "4.1.3"
77
name = "source-hubspot"
88
description = "Source implementation for HubSpot."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin
891891
# save the state
892892
self.state = {self.cursor_field: int(max_state) if int_field_type else max_state}
893893
# emmit record if it has bigger cursor value compare to the state (`True` only)
894-
return record_value > state_value
894+
return record_value >= state_value
895895

896896
def read_records(
897897
self,
@@ -1369,6 +1369,7 @@ class ContactsAllBase(Stream):
13691369
page_filter = "vidOffset"
13701370
page_field = "vid-offset"
13711371
primary_key = "canonical-vid"
1372+
limit_field = "count"
13721373
scopes = {"crm.objects.contacts.read"}
13731374
properties_scopes = {"crm.schemas.contacts.read"}
13741375
records_field = None
@@ -1393,7 +1394,7 @@ def request_params(
13931394
return params
13941395

13951396

1396-
class ContactsListMemberships(ContactsAllBase, ABC):
1397+
class ContactsListMemberships(ContactsAllBase, ClientSideIncrementalStream):
13971398
"""Contacts list Memberships, API v1
13981399
The Stream was created due to issue #8477, where supporting List Memberships in Contacts stream was requested.
13991400
According to the issue this feature is supported in API v1 by setting parameter showListMemberships=true
@@ -1407,6 +1408,16 @@ class ContactsListMemberships(ContactsAllBase, ABC):
14071408
filter_field = "showListMemberships"
14081409
filter_value = True
14091410

1411+
@property
1412+
def updated_at_field(self) -> str:
1413+
"""Name of the field associated with the state"""
1414+
return "timestamp"
1415+
1416+
@property
1417+
def cursor_field_datetime_format(self) -> str:
1418+
"""Cursor value expected to be a timestamp in milliseconds"""
1419+
return "x"
1420+
14101421

14111422
class ContactsFormSubmissions(ContactsAllBase, ABC):
14121423

airbyte-integrations/connectors/source-hubspot/unit_tests/integrations/request_builders/streams.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

3-
from typing import Any, Dict, Iterable, Tuple
3+
from typing import Any, Dict, Iterable, List, Tuple
44

55
from airbyte_cdk.test.mock_http import HttpRequest
66

@@ -135,3 +135,37 @@ def with_page_token(self, next_page_token: Dict):
135135
def build(self):
136136
q = "&".join(filter(None, self._query_params))
137137
return HttpRequest(self.URL, query_params=q)
138+
139+
140+
# We only need to mock the Contacts endpoint because it services the data extracted by ListMemberships, FormSubmissions, MergedAudit
141+
class ContactsStreamRequestBuilder(AbstractRequestBuilder):
142+
URL = "https://api.hubapi.com/contacts/v1/lists/all/contacts/all"
143+
144+
def __init__(self) -> None:
145+
self._filters = []
146+
self._vid_offset = None
147+
148+
@property
149+
def _count(self) -> str:
150+
return "count=100"
151+
152+
def with_filter(self, filter_field: str, filter_value: Any) -> "ContactsStreamRequestBuilder":
153+
self._filters.append(f"{filter_field}={filter_value}")
154+
return self
155+
156+
def with_vid_offset(self, vid_offset: str) -> "ContactsStreamRequestBuilder":
157+
self._vid_offset = f"vidOffset={vid_offset}"
158+
return self
159+
160+
@property
161+
def _query_params(self) -> List[str]:
162+
params = [
163+
self._count,
164+
self._vid_offset,
165+
]
166+
params.extend(self._filters)
167+
return filter(None, params)
168+
169+
def build(self) -> HttpRequest:
170+
q = "&".join(filter(None, self._query_params))
171+
return HttpRequest(self.URL, query_params=q)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
import json
4+
from datetime import datetime
5+
from typing import Any, Dict, List, Optional
6+
7+
from airbyte_cdk.test.mock_http import HttpResponse
8+
from airbyte_cdk.test.mock_http.response_builder import find_template
9+
10+
_CONTACTS_FIELD = "contacts"
11+
_LIST_MEMBERSHIPS_FIELD = "list-memberships"
12+
13+
14+
def _get_template() -> Dict[str, Any]:
15+
return find_template("all_contacts", __file__)
16+
17+
18+
class ContactsListMembershipBuilder:
19+
def __init__(self) -> None:
20+
self._template: Dict[str, Any] = _get_template()[_CONTACTS_FIELD][0][_LIST_MEMBERSHIPS_FIELD][0]
21+
22+
def with_timestamp(self, timestamp: datetime) -> "ContactsListMembershipBuilder":
23+
self._template["timestamp"] = int(timestamp.timestamp() * 1000)
24+
return self
25+
26+
def build(self) -> Dict[str, Any]:
27+
return self._template
28+
29+
30+
class ContactBuilder:
31+
def __init__(self) -> None:
32+
self._template: Dict[str, Any] = _get_template()[_CONTACTS_FIELD][0]
33+
34+
def with_list_memberships(self, memberships: List[ContactsListMembershipBuilder]) -> "ContactBuilder":
35+
self._template[_LIST_MEMBERSHIPS_FIELD] = [membership.build() for membership in memberships]
36+
return self
37+
38+
def build(self) -> Dict[str, Any]:
39+
return self._template
40+
41+
42+
class AllContactsResponseBuilder:
43+
def __init__(self) -> None:
44+
self._contacts = []
45+
self._vid_offset = 0
46+
self._has_more = False
47+
48+
def with_contacts(self, contacts: List[ContactBuilder]) -> "AllContactsResponseBuilder":
49+
self._contacts = [contact.build() for contact in contacts]
50+
return self
51+
52+
def with_pagination(self, vid_offset: int) -> "AllContactsResponseBuilder":
53+
self._has_more = True
54+
self._vid_offset = vid_offset
55+
return self
56+
57+
def build(self) -> HttpResponse:
58+
template = {
59+
"contacts": self._contacts,
60+
"has-more": self._has_more,
61+
"vid-offset": self._vid_offset,
62+
}
63+
64+
return HttpResponse(json.dumps(template), 200)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
from datetime import datetime, timedelta, timezone
3+
from unittest import TestCase
4+
5+
import freezegun
6+
from airbyte_cdk.test.mock_http import HttpMocker
7+
from airbyte_cdk.test.state_builder import StateBuilder
8+
from airbyte_protocol.models import SyncMode
9+
10+
from . import HubspotTestCase
11+
from .request_builders.streams import ContactsStreamRequestBuilder
12+
from .response_builder.contact_response_builder import AllContactsResponseBuilder, ContactBuilder, ContactsListMembershipBuilder
13+
14+
_START_TIME_BEFORE_ANY_RECORD = "1970-01-01T00:00:00Z"
15+
16+
_NOW = datetime.now(timezone.utc)
17+
_VID_OFFSET = 5331889818
18+
19+
20+
class ContactsListMembershipsStreamTest(TestCase, HubspotTestCase):
21+
SCOPES = ["crm.objects.contacts.read"]
22+
STREAM_NAME = "contacts_list_memberships"
23+
24+
def setUp(self) -> None:
25+
self._http_mocker = HttpMocker()
26+
self._http_mocker.__enter__()
27+
28+
self.mock_oauth(self._http_mocker, self.ACCESS_TOKEN)
29+
self.mock_scopes(self._http_mocker, self.ACCESS_TOKEN, self.SCOPES)
30+
self.mock_custom_objects(self._http_mocker)
31+
32+
def tearDown(self) -> None:
33+
self._http_mocker.__exit__(None, None, None)
34+
35+
def test_given_pagination_when_read_then_extract_records_from_both_pages(self) -> None:
36+
self.mock_response(
37+
self._http_mocker,
38+
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).build(),
39+
AllContactsResponseBuilder().with_pagination(vid_offset=_VID_OFFSET).with_contacts([
40+
ContactBuilder().with_list_memberships([
41+
ContactsListMembershipBuilder(),
42+
ContactsListMembershipBuilder(),
43+
]),
44+
]).build(),
45+
)
46+
self.mock_response(
47+
self._http_mocker,
48+
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).with_vid_offset(str(_VID_OFFSET)).build(),
49+
AllContactsResponseBuilder().with_contacts([
50+
ContactBuilder().with_list_memberships([
51+
ContactsListMembershipBuilder(),
52+
]),
53+
ContactBuilder().with_list_memberships([
54+
ContactsListMembershipBuilder(),
55+
ContactsListMembershipBuilder(),
56+
]),
57+
]).build(),
58+
)
59+
60+
output = self.read_from_stream(self.oauth_config(start_date=_START_TIME_BEFORE_ANY_RECORD), self.STREAM_NAME, SyncMode.full_refresh)
61+
62+
assert len(output.records) == 5
63+
64+
def test_given_timestamp_before_start_date_when_read_then_filter_out(self) -> None:
65+
start_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
66+
self.mock_response(
67+
self._http_mocker,
68+
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).build(),
69+
AllContactsResponseBuilder().with_contacts([
70+
ContactBuilder().with_list_memberships([
71+
ContactsListMembershipBuilder().with_timestamp(start_date + timedelta(days=10)),
72+
ContactsListMembershipBuilder().with_timestamp(start_date - timedelta(days=10)),
73+
]),
74+
]).build(),
75+
)
76+
output = self.read_from_stream(self.oauth_config(start_date=start_date.isoformat().replace("+00:00", "Z")), self.STREAM_NAME, SyncMode.full_refresh)
77+
78+
assert len(output.records) == 1
79+
80+
def test_given_state_when_read_then_filter_out(self) -> None:
81+
state_value = datetime(2024, 1, 1, tzinfo=timezone.utc)
82+
self.mock_response(
83+
self._http_mocker,
84+
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).build(),
85+
AllContactsResponseBuilder().with_contacts([
86+
ContactBuilder().with_list_memberships([
87+
ContactsListMembershipBuilder().with_timestamp(state_value + timedelta(days=10)),
88+
ContactsListMembershipBuilder().with_timestamp(state_value - timedelta(days=10)),
89+
]),
90+
]).build(),
91+
)
92+
output = self.read_from_stream(
93+
self.oauth_config(start_date=_START_TIME_BEFORE_ANY_RECORD),
94+
self.STREAM_NAME,
95+
SyncMode.incremental,
96+
StateBuilder().with_stream_state(self.STREAM_NAME, {"timestamp": int(state_value.timestamp() * 1000)}).build(),
97+
)
98+
99+
assert len(output.records) == 1
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
{
2+
"contacts": [
3+
{
4+
"vid": 5331889818,
5+
"canonical-vid": 5331889818,
6+
"merged-vids": [],
7+
"portal-id": 8727216,
8+
"is-contact": true,
9+
"properties": {
10+
"firstname": {
11+
"value": "Eric"
12+
},
13+
"lastmodifieddate": {
14+
"value": "1713702782727"
15+
},
16+
"lastname": {
17+
"value": "Harris"
18+
}
19+
},
20+
"form-submissions": [
21+
{
22+
"conversion-id": "2c7b8485-62e8-41e3-95a1-ac429358201c",
23+
"timestamp": 1714546800000,
24+
"form-id": "8372afd9-33b9-458e-ac20-58665140554b",
25+
"portal-id": 8727216,
26+
"title": "When Will We Land? - Barry Can't Swim",
27+
"form-type": "HUBSPOT",
28+
"contact-associated-by": ["EMAIL"],
29+
"meta-data": []
30+
}
31+
],
32+
"list-memberships": [
33+
{
34+
"static-list-id": 166,
35+
"internal-list-id": 2147483643,
36+
"timestamp": 1714546800000,
37+
"vid": 5331890527,
38+
"is-member": true
39+
},
40+
{
41+
"static-list-id": 167,
42+
"internal-list-id": 2147483644,
43+
"timestamp": 1714546800000,
44+
"vid": 5331890528,
45+
"is-member": true
46+
},
47+
{
48+
"static-list-id": 167,
49+
"internal-list-id": 2147483645,
50+
"timestamp": 1714546800000,
51+
"vid": 5331890529,
52+
"is-member": true
53+
}
54+
],
55+
"identity-profiles": [
56+
{
57+
"vid": 5331889818,
58+
"saved-at-timestamp": 1711049462684,
59+
"deleted-changed-timestamp": 0,
60+
"identities": [
61+
{
62+
"type": "EMAIL",
63+
"value": "[email protected]",
64+
"timestamp": 1711049462548,
65+
"is-primary": true
66+
},
67+
{
68+
"type": "LEAD_GUID",
69+
"value": "33671d61-e5fe-43c0-84ff-2ad08329b008",
70+
"timestamp": 1711049462680
71+
}
72+
]
73+
}
74+
],
75+
"merge-audits": [],
76+
"addedAt": 1711049462684
77+
}
78+
]
79+
}

docs/integrations/sources/hubspot.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ The connector is restricted by normal HubSpot [rate limitations](https://legacyd
334334

335335
| Version | Date | Pull Request | Subject |
336336
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
337+
| 4.1.3 | 2024-05-13 | [38128](https://github.com/airbytehq/airbyte/pull/38128) | contacts_list_memberships as semi-incremental stream |
337338
| 4.1.2 | 2024-04-24 | [36642](https://github.com/airbytehq/airbyte/pull/36642) | Schema descriptions and CDK 0.80.0 |
338339
| 4.1.1 | 2024-04-11 | [35945](https://github.com/airbytehq/airbyte/pull/35945) | Add integration tests |
339340
| 4.1.0 | 2024-03-27 | [36541](https://github.com/airbytehq/airbyte/pull/36541) | Added test configuration features, fixed type hints |

0 commit comments

Comments
 (0)