Skip to content

Commit 6cf1f09

Browse files
authored
🐛 Source Hubspot: Fixed engagements pagination (#11266)
* Fixed engagement stream pagination * Added unit test * Removed unused import * Fixed issue if incremental engagements attempts to get more than 10K records * Fixed comment * Merged import statement in unit test
1 parent 893e5de commit 6cf1f09

File tree

2 files changed

+156
-5
lines changed

2 files changed

+156
-5
lines changed

airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ def read_records(
824824
if not next_page_token:
825825
pagination_complete = True
826826
elif self.state and next_page_token["payload"]["after"] >= 10000:
827-
# Hubspot documentations states that the search endpoints are limited to 10,000 total results
827+
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
828828
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
829829
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
830830
# start a new search query with the latest state that has been collected.
@@ -1065,7 +1065,6 @@ class Engagements(IncrementalStream):
10651065

10661066
url = "/engagements/v1/engagements/paged"
10671067
more_key = "hasMore"
1068-
limit = 250
10691068
updated_at_field = "lastUpdated"
10701069
created_at_field = "createdAt"
10711070
primary_key = "id"
@@ -1085,10 +1084,59 @@ def request_params(
10851084
stream_slice: Mapping[str, Any] = None,
10861085
next_page_token: Mapping[str, Any] = None,
10871086
) -> MutableMapping[str, Any]:
1088-
params = {self.limit_field: self.limit}
1087+
params = {"count": 250}
1088+
if next_page_token:
1089+
params["offset"] = next_page_token["offset"]
10891090
if self.state:
1090-
params["since"] = int(self._state.timestamp() * 1000)
1091+
params.update({"since": int(self._state.timestamp() * 1000), "count": 100})
10911092
return params
1093+
1094+
def stream_slices(
1095+
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
1096+
) -> Iterable[Optional[Mapping[str, Any]]]:
1097+
return [None]
1098+
1099+
def read_records(
1100+
self,
1101+
sync_mode: SyncMode,
1102+
cursor_field: List[str] = None,
1103+
stream_slice: Mapping[str, Any] = None,
1104+
stream_state: Mapping[str, Any] = None,
1105+
) -> Iterable[Mapping[str, Any]]:
1106+
stream_state = stream_state or {}
1107+
pagination_complete = False
1108+
1109+
next_page_token = None
1110+
latest_cursor = None
1111+
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
1112+
while not pagination_complete:
1113+
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
1114+
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))
1115+
1116+
if self.filter_old_records:
1117+
records = self._filter_old_records(records)
1118+
1119+
for record in records:
1120+
cursor = self._field_to_datetime(record[self.updated_at_field])
1121+
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
1122+
yield record
1123+
1124+
next_page_token = self.next_page_token(response)
1125+
if self.state and next_page_token and next_page_token["offset"] >= 10000:
1126+
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
1127+
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
1128+
# descending order, we stop getting records if we have already reached 10,000. Attempting
1129+
# to get more than 10K will result in a HTTP 400 error.
1130+
# https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
1131+
next_page_token = None
1132+
1133+
if not next_page_token:
1134+
pagination_complete = True
1135+
1136+
# Always return an empty generator just in case no records were ever yielded
1137+
yield from []
1138+
1139+
self._update_state(latest_cursor=latest_cursor)
10921140

10931141

10941142
class Forms(Stream):

airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
1313
from source_hubspot.errors import HubspotRateLimited
1414
from source_hubspot.source import SourceHubspot
15-
from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Products, Stream, Workflows, split_properties
15+
from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Stream, Workflows, split_properties
1616

1717
NUMBER_OF_PROPERTIES = 2000
1818

@@ -399,3 +399,106 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
399399
# Instead, it should use the new state to start a new search query.
400400
assert len(records) == 11000
401401
assert test_stream.state["updatedAt"] == "2022-03-01T00:00:00+00:00"
402+
403+
404+
def test_engagements_stream_pagination_works(requests_mock, common_params):
405+
"""
406+
Tests the engagements stream handles pagination correctly, for both
407+
full_refresh and incremental sync modes.
408+
"""
409+
410+
# Mocking Request
411+
requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?hapikey=test_api_key&count=250", [
412+
{
413+
"json": {
414+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250)],
415+
"hasMore": True,
416+
"offset": 250
417+
},
418+
"status_code": 200,
419+
},
420+
{
421+
"json": {
422+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250, 500)],
423+
"hasMore": True,
424+
"offset": 500
425+
},
426+
"status_code": 200,
427+
},
428+
{
429+
"json": {
430+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595251}} for y in range(500, 600)],
431+
"hasMore": False
432+
},
433+
"status_code": 200,
434+
}
435+
])
436+
437+
requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", [
438+
{
439+
"json": {
440+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
441+
"hasMore": True,
442+
"offset": 100
443+
},
444+
"status_code": 200,
445+
},
446+
{
447+
"json": {
448+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100, 200)],
449+
"hasMore": True,
450+
"offset": 200
451+
},
452+
"status_code": 200,
453+
},
454+
{
455+
"json": {
456+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(200, 250)],
457+
"hasMore": False
458+
},
459+
"status_code": 200,
460+
}
461+
])
462+
463+
# Create test_stream instance for full refresh.
464+
test_stream = Engagements(**common_params)
465+
466+
records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh))
467+
# The stream should handle pagination correctly and output 600 records.
468+
assert len(records) == 600
469+
assert test_stream.state["lastUpdated"] == 1641234595251
470+
471+
records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
472+
# The stream should handle pagination correctly and output 600 records.
473+
assert len(records) == 250
474+
assert test_stream.state["lastUpdated"] == 1641234595252
475+
476+
477+
def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list):
478+
"""
479+
If there are more than 10,000 engagements that would be returned by the Hubspot recent engagements endpoint,
480+
the Engagements instance should stop at the 10Kth record.
481+
"""
482+
483+
responses = [
484+
{
485+
"json": {
486+
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
487+
"hasMore": True,
488+
"offset": x*100
489+
},
490+
"status_code": 200,
491+
}
492+
for x in range(1, 102)
493+
]
494+
495+
# Create test_stream instance with some state
496+
test_stream = Engagements(**common_params)
497+
test_stream.state = {"lastUpdated": 1641234595251}
498+
499+
# Mocking Request
500+
requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses)
501+
records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
502+
# The stream should not attempt to get more than 10K records.
503+
assert len(records) == 10000
504+
assert test_stream.state["lastUpdated"] == +1641234595252

0 commit comments

Comments
 (0)