Skip to content

[ISSUE #7674] contacts_list_memberships incremental using client filt… #38128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin
# save the state
self.state = {self.cursor_field: int(max_state) if int_field_type else max_state}
# emmit record if it has bigger cursor value compare to the state (`True` only)
return record_value > state_value
return record_value >= state_value
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very edge case but we had a similar discussion for Salesforce here


def read_records(
self,
Expand Down Expand Up @@ -1369,6 +1369,7 @@ class ContactsAllBase(Stream):
page_filter = "vidOffset"
page_field = "vid-offset"
primary_key = "canonical-vid"
limit_field = "count"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation was using limit instead of count. This should have no impact as we pass the default value which is 100

scopes = {"crm.objects.contacts.read"}
properties_scopes = {"crm.schemas.contacts.read"}
records_field = None
Expand All @@ -1393,7 +1394,7 @@ def request_params(
return params


class ContactsListMemberships(ContactsAllBase, ABC):
class ContactsListMemberships(ContactsAllBase, ClientSideIncrementalStream):
"""Contacts list Memberships, API v1
The Stream was created due to issue #8477, where supporting List Memberships in Contacts stream was requested.
According to the issue this feature is supported in API v1 by setting parameter showListMemberships=true
Expand All @@ -1407,6 +1408,15 @@ class ContactsListMemberships(ContactsAllBase, ABC):
filter_field = "showListMemberships"
filter_value = True

@property
def updated_at_field(self):
"""Name of the field associated with the state"""
return "timestamp"

@property
def cursor_field_datetime_format(self):
return "x"


class ContactsFormSubmissions(ContactsAllBase, ABC):

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from typing import Any, Dict, Iterable, Tuple
from typing import Any, Dict, Iterable, List, Tuple

from airbyte_cdk.test.mock_http import HttpRequest

Expand Down Expand Up @@ -135,3 +135,37 @@ def with_page_token(self, next_page_token: Dict):
def build(self):
q = "&".join(filter(None, self._query_params))
return HttpRequest(self.URL, query_params=q)


# We only need to mock the Contacts endpoint because it services the data extracted by ListMemberships, FormSubmissions, MergedAudit
class ContactsStreamRequestBuilder(AbstractRequestBuilder):
URL = "https://api.hubapi.com/contacts/v1/lists/all/contacts/all"

def __init__(self) -> None:
self._filters = []
self._vid_offset = None

@property
def _count(self) -> str:
return "count=100"

def with_filter(self, filter_field: str, filter_value: Any) -> "ContactsStreamRequestBuilder":
self._filters.append(f"{filter_field}={filter_value}")
return self

def with_vid_offset(self, vid_offset: str) -> "ContactsStreamRequestBuilder":
self._vid_offset = f"vidOffset={vid_offset}"
return self

@property
def _query_params(self) -> List[str]:
params = [
self._count,
self._vid_offset,
]
params.extend(self._filters)
return filter(None, params)

def build(self) -> HttpRequest:
q = "&".join(filter(None, self._query_params))
return HttpRequest(self.URL, query_params=q)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
from datetime import datetime
from typing import Any, Dict, List, Optional

from airbyte_cdk.test.mock_http import HttpResponse
from airbyte_cdk.test.mock_http.response_builder import find_template

_CONTACTS_FIELD = "contacts"
_LIST_MEMBERSHIPS_FIELD = "list-memberships"


def _get_template() -> Dict[str, Any]:
return find_template("all_contacts", __file__)


class ContactsListMembershipBuilder:
def __init__(self) -> None:
self._template: Dict[str, Any] = _get_template()[_CONTACTS_FIELD][0][_LIST_MEMBERSHIPS_FIELD][0]

def with_timestamp(self, timestamp: datetime) -> "ContactsListMembershipBuilder":
self._template["timestamp"] = int(timestamp.timestamp() * 1000)
return self

def build(self) -> Dict[str, Any]:
return self._template


class ContactBuilder:
def __init__(self) -> None:
self._template: Dict[str, Any] = _get_template()[_CONTACTS_FIELD][0]

def with_list_memberships(self, memberships: List[ContactsListMembershipBuilder]) -> "ContactBuilder":
self._template[_LIST_MEMBERSHIPS_FIELD] = [membership.build() for membership in memberships]
return self

def build(self) -> Dict[str, Any]:
return self._template


class AllContactsResponseBuilder:
def __init__(self) -> None:
self._contacts = []
self._vid_offset = 0
self._has_more = False

def with_contacts(self, contacts: List[ContactBuilder]) -> "AllContactsResponseBuilder":
self._contacts = [contact.build() for contact in contacts]
return self

def with_pagination(self, vid_offset: int) -> "AllContactsResponseBuilder":
self._has_more = True
self._vid_offset = vid_offset
return self

def build(self) -> HttpResponse:
template = {
"contacts": self._contacts,
"has-more": self._has_more,
"vid-offset": self._vid_offset,
}

return HttpResponse(json.dumps(template), 200)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from datetime import datetime, timezone, timedelta
from unittest import TestCase

import freezegun
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import SyncMode

from . import HubspotTestCase
from .request_builders.streams import ContactsStreamRequestBuilder
from .response_builder.contact_response_builder import AllContactsResponseBuilder, ContactBuilder, ContactsListMembershipBuilder

_START_TIME_BEFORE_ANY_RECORD = "1970-01-01T00:00:00Z"

_NOW = datetime.now(timezone.utc)
_VID_OFFSET = 5331889818


class ContactsListMembershipsStreamTest(TestCase, HubspotTestCase):
SCOPES = ["crm.objects.contacts.read"]
STREAM_NAME = "contacts_list_memberships"

def setUp(self) -> None:
self._http_mocker = HttpMocker()
self._http_mocker.__enter__()

self.mock_oauth(self._http_mocker, self.ACCESS_TOKEN)
self.mock_scopes(self._http_mocker, self.ACCESS_TOKEN, self.SCOPES)
self.mock_custom_objects(self._http_mocker)

def tearDown(self) -> None:
self._http_mocker.__exit__(None, None, None)

def test_given_pagination_when_read_then_extract_records_from_both_pages(self) -> None:
self.mock_response(
self._http_mocker,
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).build(),
AllContactsResponseBuilder().with_pagination(vid_offset=_VID_OFFSET).with_contacts([
ContactBuilder().with_list_memberships([
ContactsListMembershipBuilder(),
ContactsListMembershipBuilder(),
]),
]).build(),
)
self.mock_response(
self._http_mocker,
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).with_vid_offset(str(_VID_OFFSET)).build(),
AllContactsResponseBuilder().with_contacts([
ContactBuilder().with_list_memberships([
ContactsListMembershipBuilder(),
]),
ContactBuilder().with_list_memberships([
ContactsListMembershipBuilder(),
ContactsListMembershipBuilder(),
]),
]).build(),
)

output = self.read_from_stream(self.oauth_config(start_date=_START_TIME_BEFORE_ANY_RECORD), self.STREAM_NAME, SyncMode.full_refresh)

assert len(output.records) == 5

def test_given_timestamp_before_start_date_when_read_then_filter_out(self) -> None:
start_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
self.mock_response(
self._http_mocker,
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).build(),
AllContactsResponseBuilder().with_contacts([
ContactBuilder().with_list_memberships([
ContactsListMembershipBuilder().with_timestamp(start_date + timedelta(days=10)),
ContactsListMembershipBuilder().with_timestamp(start_date - timedelta(days=10)),
]),
]).build(),
)
output = self.read_from_stream(self.oauth_config(start_date=start_date.isoformat().replace("+00:00", "Z")), self.STREAM_NAME, SyncMode.full_refresh)

assert len(output.records) == 1

def test_given_state_when_read_then_filter_out(self) -> None:
state_value = datetime(2024, 1, 1, tzinfo=timezone.utc)
self.mock_response(
self._http_mocker,
ContactsStreamRequestBuilder().with_filter("showListMemberships", True).build(),
AllContactsResponseBuilder().with_contacts([
ContactBuilder().with_list_memberships([
ContactsListMembershipBuilder().with_timestamp(state_value + timedelta(days=10)),
ContactsListMembershipBuilder().with_timestamp(state_value - timedelta(days=10)),
]),
]).build(),
)
output = self.read_from_stream(
self.oauth_config(start_date=_START_TIME_BEFORE_ANY_RECORD),
self.STREAM_NAME,
SyncMode.incremental,
StateBuilder().with_stream_state(self.STREAM_NAME, {"timestamp": int(state_value.timestamp() * 1000)}).build(),
)

assert len(output.records) == 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"contacts": [
{
"vid": 5331889818,
"canonical-vid": 5331889818,
"merged-vids": [],
"portal-id": 8727216,
"is-contact": true,
"properties": {
"firstname": {
"value": "Eric"
},
"lastmodifieddate": {
"value": "1713702782727"
},
"lastname": {
"value": "Harris"
}
},
"form-submissions": [
{
"conversion-id": "2c7b8485-62e8-41e3-95a1-ac429358201c",
"timestamp": 1714546800000,
"form-id": "8372afd9-33b9-458e-ac20-58665140554b",
"portal-id": 8727216,
"title": "When Will We Land? - Barry Can't Swim",
"form-type": "HUBSPOT",
"contact-associated-by": ["EMAIL"],
"meta-data": []
}
],
"list-memberships": [
{
"static-list-id": 166,
"internal-list-id": 2147483643,
"timestamp": 1714546800000,
"vid": 5331890527,
"is-member": true
},
{
"static-list-id": 167,
"internal-list-id": 2147483644,
"timestamp": 1714546800000,
"vid": 5331890528,
"is-member": true
},
{
"static-list-id": 167,
"internal-list-id": 2147483645,
"timestamp": 1714546800000,
"vid": 5331890529,
"is-member": true
}
],
"identity-profiles": [
{
"vid": 5331889818,
"saved-at-timestamp": 1711049462684,
"deleted-changed-timestamp": 0,
"identities": [
{
"type": "EMAIL",
"value": "[email protected]",
"timestamp": 1711049462548,
"is-primary": true
},
{
"type": "LEAD_GUID",
"value": "33671d61-e5fe-43c0-84ff-2ad08329b008",
"timestamp": 1711049462680
}
]
}
],
"merge-audits": [],
"addedAt": 1711049462684
}
]
}
Loading