Skip to content

low-code: Yield records from generators instead of keeping them in in-memory lists #36406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3110caa
yield from generators
girarda Mar 22, 2024
10640ab
update interfaces and fix the tests
girarda Mar 22, 2024
148419e
fix some mypy issues
girarda Mar 22, 2024
3a7a0b8
fix the rest of mypy issues
girarda Mar 22, 2024
09e8696
update the schema
girarda Mar 22, 2024
30ba4d4
Add last_page_size and last_record to pagination context
girarda Mar 22, 2024
97449e6
boundaries check
girarda Mar 25, 2024
a7b57dd
add a test
girarda Mar 25, 2024
ea63f36
format
girarda Mar 26, 2024
d89dcbe
Merge branch 'master' into alex/replace_last_records_from_paginators
girarda Mar 26, 2024
2aaa8d4
missing unit test
girarda Mar 26, 2024
4e193d3
Merge branch 'alex/replace_last_records_from_paginators' of github.co…
girarda Mar 26, 2024
7b514e0
missing newline
girarda Mar 26, 2024
855e77f
merge
girarda Mar 26, 2024
834fc96
fix imports
girarda Mar 26, 2024
341d671
update unit tests
girarda Mar 26, 2024
db9721b
merge
girarda May 9, 2024
28583e5
fixes
girarda May 9, 2024
f75fb47
Revert "Airbyte CDK: use pytz.utc instead of datetime.utc (#38026)"
girarda May 9, 2024
d92ec4c
Merge branch 'master' into alex/records_generator
girarda May 10, 2024
5ce3693
Revert "Revert "Airbyte CDK: use pytz.utc instead of datetime.utc (#3…
girarda May 10, 2024
cbf9d12
typing
girarda May 10, 2024
b0456d1
Merge branch 'master' into alex/records_generator
girarda May 12, 2024
43983ed
Revert "Revert "Revert "Airbyte CDK: use pytz.utc instead of datetime…
girarda May 12, 2024
cf0670c
remove unused variable
girarda May 13, 2024
c7b94f9
hack. revert this before merging
girarda May 13, 2024
6732c79
Revert "hack. revert this before merging"
girarda May 13, 2024
ff3a83e
Revert "Revert "Revert "Revert "Airbyte CDK: use pytz.utc instead of …
girarda May 13, 2024
9be956c
merge master
girarda May 14, 2024
2cdcc4e
format
girarda May 14, 2024
69f5fb7
fix merge
girarda May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,12 @@ definitions:
interpolation_context:
- config
- headers
- last_records
- last_page_size
- last_record
- response
examples:
- "{{ headers.link.next.cursor }}"
- "{{ last_records[-1]['key'] }}"
- "{{ last_record['key'] }}"
- "{{ response['nextPage'] }}"
page_size:
title: Page Size
Expand All @@ -372,7 +373,7 @@ definitions:
interpolation_context:
- config
- headers
- last_records
- last_record
- response
examples:
- "{{ response.data.has_more is false }}"
Expand Down Expand Up @@ -2306,20 +2307,20 @@ interpolation:
x-ratelimit-limit: "600"
x-ratelimit-remaining: "598"
x-ratelimit-reset: "39"
- title: last_records
description: List of records extracted from the last response received from the API.
type: list
- title: last_record
description: Last record extracted from the response received from the API.
type: object
examples:
- name: "Test List: 19"
id: 0236d6d2
contact_count: 20
_metadata:
self: https://api.sendgrid.com/v3/marketing/lists/0236d6d2
- title: last_page_size
description: Number of records extracted from the last response received from the API.
type: object
examples:
- - name: "Test List: 19"
id: 0236d6d2
contact_count: 20
_metadata:
self: https://api.sendgrid.com/v3/marketing/lists/0236d6d2
- name: List for CI tests, number 30
id: 041ee031
contact_count: 0
_metadata:
self: https://api.sendgrid.com/v3/marketing/lists/041ee031
- 2
- title: next_page_token
description: Object describing the token to fetch the next page of records. The object has a single key "next_page_token".
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Union
from typing import Any, Iterable, List, Mapping, Union

import dpath.util
import requests
Expand Down Expand Up @@ -58,24 +58,22 @@ class DpathExtractor(RecordExtractor):
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(parameters={})

def __post_init__(self, parameters: Mapping[str, Any]):
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self.field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]

def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
response_body = self.decoder.decode(response)
if len(self.field_path) == 0:
if len(self._field_path) == 0:
extracted = response_body
else:
path = [path.eval(self.config) for path in self.field_path]
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.util.values(response_body, path)
else:
extracted = dpath.util.get(response_body, path, default=[])
if isinstance(extracted, list):
return extracted
yield from extracted
elif extracted:
return [extracted]
yield extracted
else:
return []
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
Expand All @@ -25,7 +25,7 @@ def select_records(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
) -> Iterable[Record]:
"""
Selects records from the response
:param response: The response to select the records from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping
from typing import Any, Iterable, List, Mapping

import requests

Expand All @@ -19,7 +19,7 @@ class RecordExtractor:
def extract_records(
self,
response: requests.Response,
) -> List[Mapping[str, Any]]:
) -> Iterable[Mapping[str, Any]]:
"""
Selects records from the response
:param response: The response to extract the records from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState
Expand All @@ -27,10 +27,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def filter_records(
self,
records: List[Mapping[str, Any]],
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Mapping[str, Any]]:
) -> Iterable[Mapping[str, Any]]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
return [record for record in records if self._filter_interpolator.eval(self.config, record=record, **kwargs)]
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
Expand Down Expand Up @@ -50,7 +50,7 @@ def select_records(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
) -> Iterable[Record]:
"""
Selects records from the response
:param response: The response to select the records from
Expand All @@ -60,38 +60,47 @@ def select_records(
:param next_page_token: The paginator token
:return: List of Records selected from the response
"""
all_data = self.extractor.extract_records(response)
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
self._transform(filtered_data, stream_state, stream_slice)
self._normalize_by_schema(filtered_data, schema=records_schema)
return [Record(data, stream_slice) for data in filtered_data]
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
for data in normalized_data:
yield Record(data, stream_slice)

def _normalize_by_schema(self, records: List[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]) -> List[Mapping[str, Any]]:
def _normalize_by_schema(
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
if schema:
# record has type Mapping[str, Any], but dict[str, Any] expected
return [self.schema_normalization.transform(record, schema) for record in records] # type: ignore
return records
for record in records:
normalized_record = dict(record)
self.schema_normalization.transform(normalized_record, schema)
yield normalized_record
else:
yield from records

def _filter(
self,
records: List[Mapping[str, Any]],
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
) -> List[Mapping[str, Any]]:
) -> Iterable[Mapping[str, Any]]:
if self.record_filter:
return self.record_filter.filter_records(
yield from self.record_filter.filter_records(
records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
return records
else:
yield from records

def _transform(
self,
records: List[Mapping[str, Any]],
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> None:
) -> Iterable[Mapping[str, Any]]:
for record in records:
for transformation in self.transformations:
# record has type Mapping[str, Any], but Record expected
transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class CursorPagination(BaseModel):
description='Value of the cursor defining the next page to fetch.',
examples=[
'{{ headers.link.next.cursor }}',
"{{ last_records[-1]['key'] }}",
"{{ last_record['key'] }}",
"{{ response['nextPage'] }}",
],
title='Cursor Value',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
self._token = self.pagination_strategy.initial_token

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
self._token = self.pagination_strategy.next_page_token(response, last_records)
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Mapping[str, Any]]:
self._token = self.pagination_strategy.next_page_token(response, last_page_size, last_record)
if self._token:
return {"next_page_token": self._token}
else:
Expand Down Expand Up @@ -164,9 +166,9 @@ def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
options[self.page_token_option.field_name.eval(config=self.config)] = self._token
options[self.page_token_option.field_name.eval(config=self.config)] = self._token # type: ignore # field_name is known to be an interpolated string
if self.page_size_option and self.pagination_strategy.get_page_size() and self.page_size_option.inject_into == option_type:
options[self.page_size_option.field_name.eval(config=self.config)] = self.pagination_strategy.get_page_size()
options[self.page_size_option.field_name.eval(config=self.config)] = self.pagination_strategy.get_page_size() # type: ignore # field_name is known to be an interpolated string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh i hate these so much

return options


Expand All @@ -185,12 +187,14 @@ def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> No
self._decorated = decorated
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Mapping[str, Any]]:
if self._page_count >= self._maximum_number_of_pages:
return None

self._page_count += 1
return self._decorated.next_page_token(response, last_records)
return self._decorated.next_page_token(response, last_page_size, last_record)

def path(self) -> Optional[str]:
return self._decorated.path()
Expand All @@ -201,7 +205,7 @@ def get_request_params(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
) -> Mapping[str, Any]:
return self._decorated.get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_headers(
Expand All @@ -219,7 +223,7 @@ def get_request_body_data(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping[str, Any], str]]:
) -> Union[Mapping[str, Any], str]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never returns None

return self._decorated.get_request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_body_json(
Expand All @@ -228,7 +232,7 @@ def get_request_body_json(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
) -> Mapping[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never returns None

return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def reset(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return {}

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Mapping[str, Any]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]:
return {}

def reset(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ def reset(self) -> None:
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Mapping[str, Any]]:
"""
Returns the next_page_token to use to fetch the next page of records.

:param response: the response to process
:param last_records: the records extracted from the response
:param last_page_size: the number of records read from the response
:param last_record: the last record extracted from the response
:return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
"""
pass
Expand Down
Loading
Loading