Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

low-code: Yield records from generators instead of keeping them in in-memory lists #36406

Merged
merged 31 commits into from
May 15, 2024

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Mar 22, 2024

What

Improve memory usage by yielding records from generators instead of returning lists of objects

This PR addresses a part of https://github.com/airbytehq/airbyte-internal-issues/issues/6554

How

  1. Update the record selector, extractor, and filter interfaces to work on generators instead of lists of records
  2. Update the paginator interface to only use the number of records read and the last record instead of the full list of records read
  3. Update the simple retriever to tie in everything together

Reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py
  2. airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py
  3. airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
  4. airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py
  5. airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py
  6. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py
  7. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py
  8. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py
  9. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py
  10. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py
  11. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py
  12. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py
  13. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py
  14. airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
  15. airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Copy link

vercel bot commented Mar 22, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 14, 2024 11:02pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Mar 22, 2024
@girarda girarda changed the base branch from master to alex/replace_last_records_from_paginators March 26, 2024 18:10
@@ -219,7 +223,7 @@ def get_request_body_data(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping[str, Any], str]]:
) -> Union[Mapping[str, Any], str]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never returns None

@@ -228,7 +232,7 @@ def get_request_body_json(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
) -> Mapping[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never returns None

Base automatically changed from alex/replace_last_records_from_paginators to master April 2, 2024 17:43
Copy link
Contributor

@artem1205 artem1205 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@girarda girarda marked this pull request as ready for review May 9, 2024 02:52
@girarda girarda requested review from a team and brianjlai May 9, 2024 02:52
@girarda
Copy link
Contributor Author

girarda commented May 10, 2024

confirmed this change in combination with some changes on the iterable side helps with the memory usage of the connector.

The large spikes show attempts with all the fixes, and the last (much lower one) shows the memory usage with this change, using generators in the custom components, and reducing the size of the time windows:
Screenshot 2024-05-09 at 9 31 16 PM

The underlying issue is that iterable returns gigantic responses (I've seen one ~4GB). I think fixing this would require streaming the responses, which is out of scope for this PR

@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label May 13, 2024
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes look good to me, but i suspect there will be some merge conflicts since it looks like you are fixing some mypy errors on certain files that i also fixed in one of my PRs that I merged

if self.page_size_option and self.pagination_strategy.get_page_size() and self.page_size_option.inject_into == option_type:
options[self.page_size_option.field_name.eval(config=self.config)] = self.pagination_strategy.get_page_size()
options[self.page_size_option.field_name.eval(config=self.config)] = self.pagination_strategy.get_page_size() # type: ignore # field_name is known to be an interpolated string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh i hate these so much

@@ -64,7 +64,8 @@ class SimpleRetriever(Retriever):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._paginator = self.paginator or NoPagination(parameters=parameters)
self._last_response: Optional[requests.Response] = None
self._records_from_last_response: List[Record] = []
self._last_page_size: int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed a bit w/ Alex over Slack, since we yield from the previous set of records before calculating the next_page_token, we should have the up to date count of records when we pass self._last_page_size to the paginator

@girarda girarda merged commit fb11ca2 into master May 15, 2024
31 checks passed
@girarda girarda deleted the alex/records_generator branch May 15, 2024 01:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit connectors/source/chargebee
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants