Skip to content

Commit a88a90b

Browse files
feat(source-hubspot): incremental streams to low code: email_events, engagements, subscription_changes (#58592)
Co-authored-by: Octavia Squidington III <[email protected]>
1 parent ad16606 commit a88a90b

File tree

12 files changed

+2025
-1535
lines changed

12 files changed

+2025
-1535
lines changed

airbyte-integrations/connectors/source-hubspot/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
13-
dockerImageTag: 4.11.0
13+
dockerImageTag: 4.12.0
1414
dockerRepository: airbyte/source-hubspot
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
1616
erdUrl: https://dbdocs.io/airbyteio/source-hubspot?view=relationships

airbyte-integrations/connectors/source-hubspot/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.11.0"
6+
version = "4.12.0"
77
name = "source-hubspot"
88
description = "Source implementation for HubSpot."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-hubspot/source_hubspot/components.py

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,23 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union
6+
from datetime import timedelta
7+
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
78

89
import dpath
910
import requests
1011

12+
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
1113
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
1214
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
1315
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1416
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1517
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
18+
from airbyte_cdk.sources.declarative.requesters import HttpRequester
1619
from airbyte_cdk.sources.declarative.requesters.requester import Requester
1720
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1821
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
22+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now, ab_datetime_parse
1923

2024

2125
class NewtoLegacyFieldTransformation(RecordTransformation):
@@ -64,14 +68,21 @@ def transform(
6468
class MigrateEmptyStringState(StateMigration):
6569
cursor_field: str
6670
config: Config
71+
cursor_format: Optional[str] = None
6772

68-
def __init__(self, cursor_field, config: Config):
73+
def __init__(self, cursor_field, config: Config, cursor_format: Optional[str] = None):
6974
self.cursor_field = cursor_field
75+
self.cursor_format = cursor_format
7076
self.config = config
7177

7278
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
7379
# if start date wasn't provided in the config default date will be used
7480
start_date = self.config.get("start_date", "2006-06-01T00:00:00.000Z")
81+
if self.cursor_format:
82+
dt = ab_datetime_parse(start_date)
83+
formatted_start_date = DatetimeParser().format(dt, self.cursor_format)
84+
return {self.cursor_field: formatted_start_date}
85+
7586
return {self.cursor_field: start_date}
7687

7788
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
@@ -167,3 +178,87 @@ def transform(
167178

168179
for data in additional_data:
169180
record.update(data)
181+
182+
183+
class EngagementsHttpRequester(HttpRequester):
184+
"""
185+
Engagements stream uses different endpoints:
186+
- Engagements Recent if start_date/state is less than 30 days and API is able to return all records (<10k), or
187+
- Engagements All which extracts all records, but supports filter on connector side
188+
189+
Recent Engagements API:
190+
https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
191+
192+
Important: This endpoint returns only last 10k most recently updated records in the last 30 days.
193+
194+
All Engagements API:
195+
https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements
196+
197+
Important:
198+
199+
1. The stream is declared to use one stream slice from start date(default/config/state) to time.now(). It doesn't have step.
200+
Based on this we can use stream_slice["start_time"] and be sure that this is equal to value in initial state.
201+
Stream Slice [start_time] is used to define _use_recent_api, concurrent processing of date windows is incompatible and therefore does not support using a step
202+
2.The stream is declared to use 250 as page size param in pagination.
203+
Recent Engagements API have 100 as max param but doesn't fail is bigger value was provided and returns to 100 as default.
204+
3. The stream has is_client_side_incremental=true to filter Engagements All response.
205+
"""
206+
207+
recent_api_total_records_limit = 10000
208+
recent_api_last_days_limit = 29
209+
210+
recent_api_path = "/engagements/v1/engagements/recent/modified"
211+
all_api_path = "/engagements/v1/engagements/paged"
212+
213+
_use_recent_api = None
214+
215+
def should_use_recent_api(self, stream_slice: StreamSlice) -> bool:
216+
if self._use_recent_api is not None:
217+
return self._use_recent_api
218+
219+
# Recent engagements API returns records updated in the last 30 days only. If start time is older All engagements API should be used
220+
if int(stream_slice["start_time"]) >= int(
221+
DatetimeParser().format((ab_datetime_now() - timedelta(days=self.recent_api_last_days_limit)), "%ms")
222+
):
223+
# Recent engagements API returns only 10k most recently updated records.
224+
# API response indicates that there are more records so All engagements API should be used
225+
_, response = self._http_client.send_request(
226+
http_method=self.get_method().value,
227+
url=self._join_url(self.get_url_base(), self.recent_api_path),
228+
headers=self._request_headers({}, stream_slice, {}, {}),
229+
params={"count": 250, "since": stream_slice["start_time"]},
230+
request_kwargs={"stream": self.stream_response},
231+
)
232+
if response.json().get("total") <= self.recent_api_total_records_limit:
233+
self._use_recent_api = True
234+
else:
235+
self._use_recent_api = False
236+
237+
return self._use_recent_api
238+
239+
def get_path(
240+
self,
241+
*,
242+
stream_state: Optional[StreamState] = None,
243+
stream_slice: Optional[StreamSlice] = None,
244+
next_page_token: Optional[Mapping[str, Any]] = None,
245+
) -> str:
246+
if self.should_use_recent_api(stream_slice):
247+
return self.recent_api_path
248+
return self.all_api_path
249+
250+
def get_request_params(
251+
self,
252+
*,
253+
stream_state: Optional[StreamState] = None,
254+
stream_slice: Optional[StreamSlice] = None,
255+
next_page_token: Optional[Mapping[str, Any]] = None,
256+
) -> MutableMapping[str, Any]:
257+
request_params = self._request_options_provider.get_request_params(
258+
stream_state=stream_state,
259+
stream_slice=stream_slice,
260+
next_page_token=next_page_token,
261+
)
262+
if self.should_use_recent_api(stream_slice):
263+
request_params.update({"since": stream_slice["start_time"]})
264+
return request_params

0 commit comments

Comments
 (0)