-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #7674] contacts_list_memberships incremental using client filt… #38128
[ISSUE #7674] contacts_list_memberships incremental using client filt… #38128
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
@@ -891,7 +891,7 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin | |||
# save the state | |||
self.state = {self.cursor_field: int(max_state) if int_field_type else max_state} | |||
# emmit record if it has bigger cursor value compare to the state (`True` only) | |||
return record_value > state_value | |||
return record_value >= state_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very edge case but we had a similar discussion for Salesforce here
@@ -1369,6 +1369,7 @@ class ContactsAllBase(Stream): | |||
page_filter = "vidOffset" | |||
page_field = "vid-offset" | |||
primary_key = "canonical-vid" | |||
limit_field = "count" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous implementation was using limit
instead of count
. This should have no impact as we pass the default value which is 100
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question: did we gain any speed boost out of this change?
airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py
Show resolved
Hide resolved
@bazarnov The goal what not speed boost. I expect there would be none in that case. The goal is to only emits the record the customer needs as some customers will have too much data on full_refresh and hence it will cost a lot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes to contacts_list_memberships
seems reasonable, but i also think that we might as well port contacts_form_submissions
and contacts_merged_audit
to the same functionality because all three can be expressed as semi-incremental.
Looking at the graph you mentioned in the RFR PR here , it sounds like the biggest issue w/ these streams is the cost associated with a full refresh load on every attempt to the destination warehouse.
And if so, I'm okay just saying we make all of them semi-incremental instead of RFR since that solves the issue of cost and reliability is already high enough
airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py
Show resolved
Hide resolved
@brianjlai you are right that it would have other benefits (i.e. stream would be smaller and therefore there would be smaller load on the destination) and that there are other streams that could provide this benefit. I terms of sequencing, I would release this change for our client to be able to leverage this and we can make |
…ering
What
Addressing https://github.com/airbytehq/airbyte-internal-issues/issues/7674
A customer of ours is using this stream but this is emitted too many records leading to some very expensive syncs. We don't have a good way to do server side filtering so we will use client side filtering for now.
Deletes are not required for this.
How
Having
ContactsListMemberships
inherit from ClientSideIncrementalStreamReview guide
airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py
airbyte-integrations/connectors/source-hubspot/unit_tests/integrations/test_contacts_list_memberships.py
User Impact
The stream now supports incremental. Indeed
python main.py discover --config secrets/config.json
will yieldUsing
main.py read --config secrets/config.json --catalog sample_files/test_catalog.json --debug
test_catalog.json
I get
{"type": "LOG", "log": {"level": "INFO", "message": "Read 1025 records from contacts_list_memberships stream"}}
Running the same thing with state
{ "timestamp": "1714412405830" }
, I get{"type": "LOG", "log": {"level": "INFO", "message": "Read 3 records from contacts_list_memberships stream"}}
.Can this PR be safely reverted and rolled back?