Skip to content

feat(source-hubspot) crm object streams to low code: goals #59727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 4.12.0
dockerImageTag: 4.13.0
dockerRepository: airbyte/source-hubspot
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
erdUrl: https://dbdocs.io/airbyteio/source-hubspot?view=relationships
Expand Down
645 changes: 326 additions & 319 deletions airbyte-integrations/connectors/source-hubspot/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.12.0"
version = "4.13.0"
name = "source-hubspot"
description = "Source implementation for HubSpot."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now, ab_datetime_parse
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_now, ab_datetime_parse


class NewtoLegacyFieldTransformation(RecordTransformation):
Expand Down Expand Up @@ -262,3 +263,94 @@ def get_request_params(
if self.should_use_recent_api(stream_slice):
request_params.update({"since": stream_slice["start_time"]})
return request_params


@dataclass
class HubspotSchemaExtractor(RecordExtractor):
"""
Transformation that encapsulates the list of properties under a single object because DynamicSchemaLoader only
accepts the set of dynamic schema fields as a single record.
This might be doable with the existing DpathExtractor configuration.
"""

config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))

def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
yield {"properties": list(self.decoder.decode(response))}


@dataclass
class HubspotRenamePropertiesTransformation(RecordTransformation):
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = {}
for key, value in record.items():
if key == "properties":
# Transforms properties object so that it adheres to JSON schema format
# This could also be replaced with this in the manifest:
# type: AddFields
# fields:
# - path: [ "properties", "properties" ]
# value: "{{ record }}"
# - path: [ "properties" ]
# value: "{{ {'type': 'object'} }}"
transformed_record[key] = {
"type": "object",
"properties": value,
}
else:
# We need to rename all the properties at the top level to include the properties_
# prefix and I didn't think of a way to do that w/o a custom transformation
updated_key = f"properties_{key}"
transformed_record[updated_key] = value

record.clear()
record.update(transformed_record)


class EntitySchemaNormalization(TypeTransformer):
def __init__(self, *args, **kwargs):
config = TransformConfig.CustomSchemaNormalization
super().__init__(config)
self.registerCustomTransform(self.get_transform_function())

@staticmethod
def get_transform_function():
def transform_function(original_value: str, field_schema: Dict[str, Any]) -> Any:
target_type = field_schema.get("type")
target_format = field_schema.get("format")
if isinstance(original_value, str):
if original_value == "":
# do not cast empty strings, return None instead to be properly cast.
transformed_value = None
return transformed_value
if "number" in target_type:
# do not cast numeric IDs into float, use integer instead
target_type = int if original_value.isnumeric() else float
transformed_value = target_type(original_value.replace(",", ""))
return transformed_value
if "boolean" in target_type and original_value.lower() in ["true", "false"]:
transformed_value = str(original_value).lower() == "true"
return transformed_value
if target_format:
if field_schema.get("__ab_apply_cast_datetime") is False:
return original_value
if "date" == target_format:
dt = ab_datetime_parse(original_value)
transformed_value = DatetimeParser().format(dt, "%Y-%m-%d")
return transformed_value
if "date-time" == target_format:
dt = ab_datetime_parse(original_value)
transformed_value = ab_datetime_format(dt)
return transformed_value

return original_value

return transform_function
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,126 @@ definitions:
cursor_field: timestamp
cursor_format: "%ms"

goals_stream:
$ref: "#/definitions/stream_base"
name: goals
primary_key:
- id
retriever:
$ref: "#/definitions/base_retriever"
requester:
$ref: "#/definitions/base_requester"
path: /crm/v3/objects/goal_targets
request_parameters:
archived: "false"
properties:
type: QueryProperties
property_list:
type: PropertiesFromEndpoint
property_field_path: ["name"]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://api.hubapi.com
authenticator:
$ref: "#/definitions/authenticator"
path: /properties/v2/goal_targets/properties
http_method: GET
request_headers:
Content-Type: "application/json"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
property_chunking:
type: PropertyChunking
property_limit_type: property_count
property_limit: 50
record_selector:
$ref: "#/definitions/base_selector"
schema_normalization:
type: CustomSchemaNormalization
class_name: source_hubspot.components.EntitySchemaNormalization
incremental_sync:
type: DatetimeBasedCursor
cursor_field: updatedAt
start_datetime:
type: MinMaxDatetime
datetime: "{{ format_datetime(config.get('start_date', '2006-06-01T00:00:00Z'), '%ms', '%Y-%m-%dT%H:%M:%SZ') }}"
datetime_format: "%ms"
datetime_format: "%ms"
lookback_window: P{{ config.get('lookback_window', 0) }}D
is_client_side_incremental: true
cursor_datetime_formats:
- "%ms"
- "%Y-%m-%dT%H:%M:%S.%fZ"
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%dT%H:%M:%SZ"
transformations:
- type: DpathFlattenFields
field_path:
- properties
key_transformation:
type: KeyTransformation
prefix: properties_
schema_loader:
- type: DynamicSchemaLoader
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: /properties/v2/goal_targets/properties
record_selector:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_hubspot.components.HubspotSchemaExtractor
field_path: []
schema_transformations:
- type: AddFields
fields:
- path: ["properties"]
value: "{{ record }}"
- type: CustomTransformation
class_name: source_hubspot.components.HubspotRenamePropertiesTransformation
schema_type_identifier:
type: SchemaTypeIdentifier
key_pointer: ["name"]
type_pointer: ["type"]
schema_pointer: ["properties"]
types_mapping:
- type: TypesMap
target_type: string
current_type: enumeration
- type: TypesMap
target_type: timestamp_with_timezone
current_type: datetime
- type: TypesMap
target_type: timestamp_with_timezone
current_type: date-time
- type: TypesMap
target_type: boolean
current_type: bool
- type: TypesMap
target_type: date
current_type: date
- type: TypesMap
target_type: string
current_type: json
- type: TypesMap
target_type: string
current_type: phone_number
- type: InlineSchemaLoader
schema:
$ref: "#/schemas/goals"
state_migrations:
- type: CustomStateMigration
class_name: source_hubspot.components.MigrateEmptyStringState
cursor_field: updatedAt
cursor_format: "%Y-%m-%dT%H:%M:%S.%fZ"

streams:
- "#/definitions/campaigns_stream"
- "#/definitions/companies_property_history_stream"
Expand All @@ -897,6 +1017,7 @@ streams:
- "#/definitions/email_events_stream"
- "#/definitions/engagements_stream"
- "#/definitions/subscription_changes_stream"
- "#/definitions/goals_stream"

# HubSpot account is limited to 110 requests every 10 seconds https://developers.hubspot.com/docs/guides/apps/api-usage/usage-details#rate-limits
concurrency_level:
Expand Down Expand Up @@ -4637,3 +4758,35 @@ schemas:
type:
- "null"
- integer

goals:
$schema: http://json-schema.org/draft-07/schema#
type:
- "null"
- object
additionalProperties: true
properties:
id:
description: Unique identifier for the goal.
type:
- "null"
- string
createdAt:
description: Date and time when the goal was created.
type:
- "null"
- string
format: date-time
__ab_apply_cast_datetime: false
updatedAt:
description: Date and time when the goal was last updated.
type:
- "null"
- string
format: date-time
__ab_apply_cast_datetime: false
archived:
description: Indicates if the goal is archived or not.
type:
- "null"
- boolean
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
EngagementsTasksWebAnalytics,
Forms,
FormSubmissions,
Goals,
GoalsWebAnalytics,
Leads,
LineItems,
Expand Down Expand Up @@ -97,6 +96,7 @@
"email_events": {"content"},
"engagements": {"crm.objects.companies.read", "crm.objects.contacts.read", "crm.objects.deals.read", "tickets", "e-commerce"},
"subscription_changes": {"content"},
"goals": {"crm.objects.goals.read"},
}


Expand Down Expand Up @@ -209,7 +209,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
EngagementsTasks(**common_params),
Forms(**common_params),
FormSubmissions(**common_params),
Goals(**common_params),
Leads(**common_params),
LineItems(**common_params),
Owners(**common_params),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,20 @@ def find_stream(stream_name, config):
@pytest.fixture(autouse=True)
def patch_time(mocker):
mocker.patch("time.sleep")


@pytest.fixture()
def mock_dynamic_schema_requests(requests_mock):
requests_mock.get(
"https://api.hubapi.com/properties/v2/goal_targets/properties",
json=[
{
"name": "hs__migration_soft_delete",
"label": "migration_soft_delete_deprecated",
"description": "Describes if the goal target can be treated as deleted.",
"groupName": "goal_target_information",
"type": "enumeration",
}
],
status_code=200,
)
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ def mock_response(cls, http_mocker: HttpMocker, request, responses, method: str
responses = [responses]
getattr(http_mocker, method)(request, responses)

@classmethod
def mock_dynamic_schema_requests(cls, http_mocker: HttpMocker):
for request_mock in http_mocker._get_matchers():
# check if dynamic stream was already mocked
if "goal_targets" in request_mock.request._parsed_url.path:
return

templates = [{"name": "hs__test_field", "type": "enumeration"}]
response_builder = RootHttpResponseBuilder(templates)
http_mocker.get(
PropertiesRequestBuilder().for_entity("goal_targets").build(),
response_builder.build()
)

@classmethod
def record_builder(cls, stream: str, record_cursor_path):
return create_record_builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,22 @@ def _set_up_oauth(self, http_mocker: HttpMocker):
self.mock_oauth(http_mocker, self.ACCESS_TOKEN)
self.mock_scopes(http_mocker, self.ACCESS_TOKEN, self.SCOPES)

def _set_up_requests(self, http_mocker: HttpMocker, with_oauth: bool = False):
def _set_up_requests(self, http_mocker: HttpMocker, with_oauth: bool = False, with_dynamic_schemas: bool = True):
if with_oauth:
self._set_up_oauth(http_mocker)
self.mock_custom_objects(http_mocker)
self.mock_properties(http_mocker, self.OBJECT_TYPE, self.PROPERTIES)
if with_dynamic_schemas:
self.mock_dynamic_schema_requests(http_mocker)

@HttpMocker()
def test_given_oauth_authentication_when_read_then_perform_authenticated_queries(self, http_mocker: HttpMocker):
self._set_up_requests(http_mocker, with_oauth=True)
self._set_up_requests(http_mocker, with_oauth=True, with_dynamic_schemas=False)
self.read_from_stream(self.oauth_config(), self.STREAM_NAME, SyncMode.full_refresh)

@HttpMocker()
def test_given_records_when_read_extract_desired_records(self, http_mocker: HttpMocker):
self._set_up_requests(http_mocker, with_oauth=True)
self._set_up_requests(http_mocker, with_oauth=True, with_dynamic_schemas=False)
self.mock_response(http_mocker, self.request(), self.response())
output = self.read_from_stream(self.oauth_config(), self.STREAM_NAME, SyncMode.full_refresh)
assert len(output.records) == 1
Expand Down
Loading
Loading