Skip to content

Commit 2282a4a

Browse files
🎉 Source Hubspot: Migrate to CDK (#10177)
* migrate SourceHubspot to cdk * refactor discover method * change method name * deleted Client class * remove comment * added get_updated_state * fix setting initial state * fix stream_state dict key * fix cursor_field * change check test case status * refactor streams method * remove comment * remove TODOs * remove comments * fix get_updated_state * refactor chunk_read * override _read_incremental * fix unit tests * remove comments * fix test_check_connection_backoff_on_server_error * fix test_check_connection_backoff_on_server_error 2 * fix test_check_connection_backoff_on_limit_reached * fix unit tests * clear comments * override read method on Source * added comments to overriding methods * some improvements * reafactor overridden _read_incremental * format code * refactor discovery * remove discover * format code 2 * added return types * refactor template stream classes * remove comments * remove _name field * rename api.py to streams.py * move to HttpStream * refactor FormSubmissions * refactor Campaings * refactor ContactsListMemberships * CRMSearchStream refactor * CRMSearchStream refactor 2 * CRMObjectStream refactor * DealStageHistoryStream refactor * Deals refactor * Engagements refactor * path method refactor * refactor authentication * fix check_connection * fix call parse_response * fix Engagements stream * fix CRMSearchStream * fix CRMObjectIncremental stream * override _read_incremental * remove commented codes * format code * update cdk version * fix cursor field * fix unit tests * removed client * clear comments * clear comments 2 * clear comments 3 * clear comments 4 * override backoff_time * remove comment * format code * backoff_time modified * refactor backoff_time * format code * added return typing * format code * removed cursor_paths * bump version * updated spec and def yaml Co-authored-by: auganbay <[email protected]>
1 parent ce5c007 commit 2282a4a

File tree

12 files changed

+826
-440
lines changed

12 files changed

+826
-440
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@
314314
- name: HubSpot
315315
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
316316
dockerRepository: airbyte/source-hubspot
317-
dockerImageTag: 0.1.40
317+
dockerImageTag: 0.1.41
318318
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
319319
icon: hubspot.svg
320320
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3086,7 +3086,7 @@
30863086
supportsNormalization: false
30873087
supportsDBT: false
30883088
supported_destination_sync_modes: []
3089-
- dockerImage: "airbyte/source-hubspot:0.1.40"
3089+
- dockerImage: "airbyte/source-hubspot:0.1.41"
30903090
spec:
30913091
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
30923092
connectionSpecification:

airbyte-integrations/connectors/source-hubspot/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
3434
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3535
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3636

37-
LABEL io.airbyte.version=0.1.40
37+
LABEL io.airbyte.version=0.1.41
3838
LABEL io.airbyte.name=airbyte/source-hubspot

airbyte-integrations/connectors/source-hubspot/acceptance-test-config.yml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ tests:
1010
- config_path: "integration_tests/invalid_config.json"
1111
status: "failed"
1212
- config_path: "integration_tests/invalid_config_oauth.json"
13-
status: "exception"
13+
status: "failed"
1414
- config_path: "integration_tests/invalid_config_wrong_title.json"
1515
status: "exception"
1616
discovery:
@@ -32,11 +32,6 @@ tests:
3232
- config_path: "secrets/config.json"
3333
configured_catalog_path: "sample_files/configured_catalog.json"
3434
future_state_path: "integration_tests/abnormal_state.json"
35-
cursor_paths:
36-
subscription_changes: ["timestamp"]
37-
email_events: ["timestamp"]
38-
contact_lists: ["timestamp"]
39-
property_history: ["timestamp"]
4035
full_refresh:
4136
- config_path: "secrets/config.json"
4237
configured_catalog_path: "sample_files/full_refresh_catalog.json"

airbyte-integrations/connectors/source-hubspot/integration_tests/abnormal_state.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
44
},
55
"contact_lists": {
6-
"timestamp": "2221-10-12T13:37:56.412000+00:00"
6+
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
77
},
88
"contacts": {
99
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
@@ -12,7 +12,7 @@
1212
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
1313
},
1414
"email_events": {
15-
"timestamp": "2221-10-12T13:37:56.412000+00:00"
15+
"created": "2221-10-12T13:37:56.412000+00:00"
1616
},
1717
"engagements_calls": {
1818
"updatedAt": "2221-10-12T13:37:56.412000+00:00"

airbyte-integrations/connectors/source-hubspot/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk~=0.1",
9+
"airbyte-cdk~=0.1.49",
1010
"backoff==1.11.1",
1111
"pendulum==2.1.2",
1212
"requests==2.26.0",

airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py

Lines changed: 0 additions & 125 deletions
This file was deleted.

airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py

Lines changed: 166 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,177 @@
22
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
#
44

5+
import copy
56
import logging
6-
from typing import Any, MutableMapping
7+
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple
78

8-
from airbyte_cdk.sources.deprecated.base_source import BaseClient, BaseSource, ConfiguredAirbyteStream
9+
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
10+
from airbyte_cdk.sources import AbstractSource
11+
from airbyte_cdk.sources.deprecated.base_source import ConfiguredAirbyteStream
12+
from airbyte_cdk.sources.streams import Stream
13+
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
14+
from airbyte_cdk.utils.event_timing import create_timer
15+
from requests import HTTPError
16+
from source_hubspot.streams import (
17+
API,
18+
Campaigns,
19+
Companies,
20+
ContactLists,
21+
Contacts,
22+
ContactsListMemberships,
23+
DealPipelines,
24+
Deals,
25+
EmailEvents,
26+
Engagements,
27+
EngagementsCalls,
28+
EngagementsEmails,
29+
EngagementsMeetings,
30+
EngagementsNotes,
31+
EngagementsTasks,
32+
FeedbackSubmissions,
33+
Forms,
34+
FormSubmissions,
35+
LineItems,
36+
MarketingEmails,
37+
Owners,
38+
Products,
39+
PropertyHistory,
40+
Quotes,
41+
SubscriptionChanges,
42+
TicketPipelines,
43+
Tickets,
44+
Workflows,
45+
)
946

10-
from .client import Client
1147

48+
class SourceHubspot(AbstractSource):
49+
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
50+
"""Check connection"""
51+
alive = True
52+
error_msg = None
53+
common_params = self.get_common_params(config=config)
54+
try:
55+
contacts = Contacts(**common_params)
56+
_ = contacts.properties
57+
except HTTPError as error:
58+
alive = False
59+
error_msg = repr(error)
1260

13-
class SourceHubspot(BaseSource):
14-
client_class = Client
61+
return alive, error_msg
1562

16-
def _read_stream(
17-
self, logger: logging.Logger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
18-
):
63+
@staticmethod
64+
def get_api(config: Mapping[str, Any]) -> API:
65+
credentials = config.get("credentials", {})
66+
return API(credentials=credentials)
67+
68+
def get_common_params(self, config) -> Mapping[str, Any]:
69+
start_date = config.get("start_date")
70+
credentials = config["credentials"]
71+
api = self.get_api(config=config)
72+
common_params = dict(api=api, start_date=start_date, credentials=credentials)
73+
74+
if credentials.get("credentials_title") == "OAuth Credentials":
75+
common_params["authenticator"] = api.get_authenticator(credentials)
76+
return common_params
77+
78+
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
79+
credentials = config.get("credentials", {})
80+
common_params = self.get_common_params(config=config)
81+
streams = [
82+
Campaigns(**common_params),
83+
Companies(**common_params),
84+
ContactLists(**common_params),
85+
Contacts(**common_params),
86+
ContactsListMemberships(**common_params),
87+
DealPipelines(**common_params),
88+
Deals(**common_params),
89+
EmailEvents(**common_params),
90+
Engagements(**common_params),
91+
EngagementsCalls(**common_params),
92+
EngagementsEmails(**common_params),
93+
EngagementsMeetings(**common_params),
94+
EngagementsNotes(**common_params),
95+
EngagementsTasks(**common_params),
96+
FeedbackSubmissions(**common_params),
97+
Forms(**common_params),
98+
FormSubmissions(**common_params),
99+
LineItems(**common_params),
100+
MarketingEmails(**common_params),
101+
Owners(**common_params),
102+
Products(**common_params),
103+
PropertyHistory(**common_params),
104+
SubscriptionChanges(**common_params),
105+
Tickets(**common_params),
106+
TicketPipelines(**common_params),
107+
Workflows(**common_params),
108+
]
109+
110+
credentials_title = credentials.get("credentials_title")
111+
if credentials_title == "API Key Credentials":
112+
streams.append(Quotes(**common_params))
113+
114+
return streams
115+
116+
def read(
117+
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
118+
) -> Iterator[AirbyteMessage]:
119+
"""
120+
This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream.
121+
"""
122+
connector_state = copy.deepcopy(state or {})
123+
logger.info(f"Starting syncing {self.name}")
124+
config, internal_config = split_config(config)
125+
# TODO assert all streams exist in the connector
126+
# get the streams once in case the connector needs to make any queries to generate them
127+
stream_instances = {s.name: s for s in self.streams(config)}
128+
self._stream_to_instance_map = stream_instances
129+
with create_timer(self.name) as timer:
130+
for configured_stream in catalog.streams:
131+
stream_instance = stream_instances.get(configured_stream.stream.name)
132+
if not stream_instance and configured_stream.stream.name == "quotes":
133+
logger.warning("Stream `quotes` does not exist in the source. Skip reading `quotes` stream.")
134+
continue
135+
if not stream_instance:
136+
raise KeyError(
137+
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
138+
)
139+
140+
try:
141+
yield from self._read_stream(
142+
logger=logger,
143+
stream_instance=stream_instance,
144+
configured_stream=configured_stream,
145+
connector_state=connector_state,
146+
internal_config=internal_config,
147+
)
148+
except Exception as e:
149+
logger.exception(f"Encountered an exception while reading stream {self.name}")
150+
raise e
151+
finally:
152+
logger.info(f"Finished syncing {self.name}")
153+
logger.info(timer.report())
154+
155+
logger.info(f"Finished syncing {self.name}")
156+
157+
def _read_incremental(
158+
self,
159+
logger: logging.Logger,
160+
stream_instance: Stream,
161+
configured_stream: ConfiguredAirbyteStream,
162+
connector_state: MutableMapping[str, Any],
163+
internal_config: InternalConfig,
164+
) -> Iterator[AirbyteMessage]:
19165
"""
20-
This method is overridden to check if the stream exists in the client.
166+
This method is overridden to checkpoint the latest actual state,
167+
because stream state is refreshed after reading each batch of records (if need_chunk is True),
168+
or reading all records in the stream.
21169
"""
22-
stream_name = configured_stream.stream.name
23-
if not client._apis.get(stream_name):
24-
logger.warning(f"Stream {stream_name} does not exist in the client.")
25-
return
26-
yield from super()._read_stream(logger=logger, client=client, configured_stream=configured_stream, state=state)
170+
yield from super()._read_incremental(
171+
logger=logger,
172+
stream_instance=stream_instance,
173+
configured_stream=configured_stream,
174+
connector_state=connector_state,
175+
internal_config=internal_config,
176+
)
177+
stream_state = stream_instance.get_updated_state(current_stream_state={}, latest_record={})
178+
yield self._checkpoint_state(stream_instance, stream_state, connector_state)

0 commit comments

Comments
 (0)