Skip to content

Commit fb11ca2

Browse files
authored
low-code: Yield records from generators instead of keeping them in in-memory lists (#36406)
1 parent 4b84c63 commit fb11ca2

26 files changed

+161
-183
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, List, Mapping, Union
6+
from typing import Any, Iterable, List, Mapping, Union
77

88
import dpath.util
99
import requests
@@ -59,23 +59,24 @@ class DpathExtractor(RecordExtractor):
5959
decoder: Decoder = JsonDecoder(parameters={})
6060

6161
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
62+
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
6263
for path_index in range(len(self.field_path)):
6364
if isinstance(self.field_path[path_index], str):
64-
self.field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
65+
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
6566

66-
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
67+
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
6768
response_body = self.decoder.decode(response)
68-
if len(self.field_path) == 0:
69+
if len(self._field_path) == 0:
6970
extracted = response_body
7071
else:
71-
path = [path.eval(self.config) for path in self.field_path] # type: ignore # field_path elements are always cast to interpolated string
72+
path = [path.eval(self.config) for path in self._field_path]
7273
if "*" in path:
7374
extracted = dpath.util.values(response_body, path)
7475
else:
7576
extracted = dpath.util.get(response_body, path, default=[])
7677
if isinstance(extracted, list):
77-
return extracted
78+
yield from extracted
7879
elif extracted:
79-
return [extracted]
80+
yield extracted
8081
else:
81-
return []
82+
yield from []

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from abc import abstractmethod
66
from dataclasses import dataclass
7-
from typing import Any, List, Mapping, Optional
7+
from typing import Any, Iterable, Mapping, Optional
88

99
import requests
1010
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
@@ -25,7 +25,7 @@ def select_records(
2525
records_schema: Mapping[str, Any],
2626
stream_slice: Optional[StreamSlice] = None,
2727
next_page_token: Optional[Mapping[str, Any]] = None,
28-
) -> List[Record]:
28+
) -> Iterable[Record]:
2929
"""
3030
Selects records from the response
3131
:param response: The response to select the records from

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
54
from abc import abstractmethod
65
from dataclasses import dataclass
7-
from typing import Any, List, Mapping
6+
from typing import Any, Iterable, Mapping
87

98
import requests
109

@@ -19,7 +18,7 @@ class RecordExtractor:
1918
def extract_records(
2019
self,
2120
response: requests.Response,
22-
) -> List[Mapping[str, Any]]:
21+
) -> Iterable[Mapping[str, Any]]:
2322
"""
2423
Selects records from the response
2524
:param response: The response to extract the records from

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, List, Mapping, Optional
6+
from typing import Any, Iterable, Mapping, Optional
77

88
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
99
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
@@ -27,10 +27,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2727

2828
def filter_records(
2929
self,
30-
records: List[Mapping[str, Any]],
30+
records: Iterable[Mapping[str, Any]],
3131
stream_state: StreamState,
3232
stream_slice: Optional[StreamSlice] = None,
3333
next_page_token: Optional[Mapping[str, Any]] = None,
34-
) -> List[Mapping[str, Any]]:
34+
) -> Iterable[Mapping[str, Any]]:
3535
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
36-
return [record for record in records if self._filter_interpolator.eval(self.config, record=record, **kwargs)]
36+
for record in records:
37+
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
38+
yield record

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py

+24-15
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6-
from typing import Any, List, Mapping, Optional
6+
from typing import Any, Iterable, List, Mapping, Optional
77

88
import requests
99
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
@@ -50,7 +50,7 @@ def select_records(
5050
records_schema: Mapping[str, Any],
5151
stream_slice: Optional[StreamSlice] = None,
5252
next_page_token: Optional[Mapping[str, Any]] = None,
53-
) -> List[Record]:
53+
) -> Iterable[Record]:
5454
"""
5555
Selects records from the response
5656
:param response: The response to select the records from
@@ -60,38 +60,47 @@ def select_records(
6060
:param next_page_token: The paginator token
6161
:return: List of Records selected from the response
6262
"""
63-
all_data = self.extractor.extract_records(response)
63+
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
6464
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
65-
self._transform(filtered_data, stream_state, stream_slice)
66-
self._normalize_by_schema(filtered_data, schema=records_schema)
67-
return [Record(data, stream_slice) for data in filtered_data]
65+
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
66+
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
67+
for data in normalized_data:
68+
yield Record(data, stream_slice)
6869

69-
def _normalize_by_schema(self, records: List[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]) -> List[Mapping[str, Any]]:
70+
def _normalize_by_schema(
71+
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
72+
) -> Iterable[Mapping[str, Any]]:
7073
if schema:
7174
# record has type Mapping[str, Any], but dict[str, Any] expected
72-
return [self.schema_normalization.transform(record, schema) for record in records] # type: ignore
73-
return records
75+
for record in records:
76+
normalized_record = dict(record)
77+
self.schema_normalization.transform(normalized_record, schema)
78+
yield normalized_record
79+
else:
80+
yield from records
7481

7582
def _filter(
7683
self,
77-
records: List[Mapping[str, Any]],
84+
records: Iterable[Mapping[str, Any]],
7885
stream_state: StreamState,
7986
stream_slice: Optional[StreamSlice],
8087
next_page_token: Optional[Mapping[str, Any]],
81-
) -> List[Mapping[str, Any]]:
88+
) -> Iterable[Mapping[str, Any]]:
8289
if self.record_filter:
83-
return self.record_filter.filter_records(
90+
yield from self.record_filter.filter_records(
8491
records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
8592
)
86-
return records
93+
else:
94+
yield from records
8795

8896
def _transform(
8997
self,
90-
records: List[Mapping[str, Any]],
98+
records: Iterable[Mapping[str, Any]],
9199
stream_state: StreamState,
92100
stream_slice: Optional[StreamSlice] = None,
93-
) -> None:
101+
) -> Iterable[Mapping[str, Any]]:
94102
for record in records:
95103
for transformation in self.transformations:
96104
# record has type Mapping[str, Any], but Record expected
97105
transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore
106+
yield record

airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,11 @@ class CursorPagination(BaseModel):
874874
cursor_value: str = Field(
875875
...,
876876
description='Value of the cursor defining the next page to fetch.',
877-
examples=['{{ headers.link.next.cursor }}', "{{ last_record['key'] }}", "{{ response['nextPage'] }}"],
877+
examples=[
878+
'{{ headers.link.next.cursor }}',
879+
"{{ last_record['key'] }}",
880+
"{{ response['nextPage'] }}",
881+
],
878882
title='Cursor Value',
879883
)
880884
page_size: Optional[int] = Field(None, description='The number of records to include in each pages.', examples=[100], title='Page Size')

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, List, Mapping, MutableMapping, Optional, Union
6+
from typing import Any, Mapping, MutableMapping, Optional, Union
77

88
import requests
99
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
@@ -101,8 +101,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
101101
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
102102
self._token = self.pagination_strategy.initial_token
103103

104-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
105-
self._token = self.pagination_strategy.next_page_token(response, last_records)
104+
def next_page_token(
105+
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
106+
) -> Optional[Mapping[str, Any]]:
107+
self._token = self.pagination_strategy.next_page_token(response, last_page_size, last_record)
106108
if self._token:
107109
return {"next_page_token": self._token}
108110
else:
@@ -185,12 +187,14 @@ def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> No
185187
self._decorated = decorated
186188
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
187189

188-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
190+
def next_page_token(
191+
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
192+
) -> Optional[Mapping[str, Any]]:
189193
if self._page_count >= self._maximum_number_of_pages:
190194
return None
191195

192196
self._page_count += 1
193-
return self._decorated.next_page_token(response, last_records)
197+
return self._decorated.next_page_token(response, last_page_size, last_record)
194198

195199
def path(self) -> Optional[str]:
196200
return self._decorated.path()

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, List, Mapping, MutableMapping, Optional, Union
6+
from typing import Any, Mapping, MutableMapping, Optional, Union
77

88
import requests
99
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
@@ -57,7 +57,7 @@ def get_request_body_json(
5757
) -> Mapping[str, Any]:
5858
return {}
5959

60-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Mapping[str, Any]:
60+
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]:
6161
return {}
6262

6363
def reset(self) -> None:

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from abc import ABC, abstractmethod
66
from dataclasses import dataclass
7-
from typing import Any, List, Mapping, Optional
7+
from typing import Any, Mapping, Optional
88

99
import requests
1010
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
@@ -27,12 +27,15 @@ def reset(self) -> None:
2727
"""
2828

2929
@abstractmethod
30-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
30+
def next_page_token(
31+
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
32+
) -> Optional[Mapping[str, Any]]:
3133
"""
3234
Returns the next_page_token to use to fetch the next page of records.
3335
3436
:param response: the response to process
35-
:param last_records: the records extracted from the response
37+
:param last_page_size: the number of records read from the response
38+
:param last_record: the last record extracted from the response
3639
:return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
3740
"""
3841
pass

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py

+6-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, Dict, List, Mapping, Optional, Union
6+
from typing import Any, Dict, Mapping, Optional, Union
77

88
import requests
99
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
@@ -40,42 +40,37 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4040
else:
4141
self._cursor_value = self.cursor_value
4242
if isinstance(self.stop_condition, str):
43-
self._stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters)
43+
self._stop_condition: Optional[InterpolatedBoolean] = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters)
4444
else:
45-
self._stop_condition = self.stop_condition # type: ignore # the type has been checked
45+
self._stop_condition = self.stop_condition
4646

4747
@property
4848
def initial_token(self) -> Optional[Any]:
4949
return None
5050

51-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
51+
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
5252
decoded_response = self.decoder.decode(response)
5353

5454
# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
5555
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
5656
headers: Dict[str, Any] = dict(response.headers)
5757
headers["link"] = response.links
58-
59-
last_record = last_records[-1] if last_records else None
60-
6158
if self._stop_condition:
6259
should_stop = self._stop_condition.eval(
6360
self.config,
6461
response=decoded_response,
6562
headers=headers,
66-
last_records=last_records,
6763
last_record=last_record,
68-
last_page_size=len(last_records),
64+
last_page_size=last_page_size,
6965
)
7066
if should_stop:
7167
return None
7268
token = self._cursor_value.eval(
7369
config=self.config,
74-
last_records=last_records,
7570
response=decoded_response,
7671
headers=headers,
7772
last_record=last_record,
78-
last_page_size=len(last_records),
73+
last_page_size=last_page_size,
7974
)
8075
return token if token else None
8176

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, List, Mapping, Optional, Union
6+
from typing import Any, Mapping, Optional, Union
77

88
import requests
99
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
@@ -56,14 +56,14 @@ def initial_token(self) -> Optional[Any]:
5656
return self._offset
5757
return None
5858

59-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
59+
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
6060
decoded_response = self.decoder.decode(response)
6161

6262
# Stop paginating when there are fewer records than the page size or the current page has no records
63-
if (self._page_size and len(last_records) < self._page_size.eval(self.config, response=decoded_response)) or len(last_records) == 0:
63+
if (self._page_size and last_page_size < self._page_size.eval(self.config, response=decoded_response)) or last_page_size == 0:
6464
return None
6565
else:
66-
self._offset += len(last_records)
66+
self._offset += last_page_size
6767
return self._offset
6868

6969
def reset(self) -> None:

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, List, Mapping, Optional, Union
6+
from typing import Any, Mapping, Optional, Union
77

88
import requests
99
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
@@ -43,9 +43,9 @@ def initial_token(self) -> Optional[Any]:
4343
return self._page
4444
return None
4545

46-
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
46+
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
4747
# Stop paginating when there are fewer records than the page size or the current page has no records
48-
if (self._page_size and len(last_records) < self._page_size) or len(last_records) == 0:
48+
if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
4949
return None
5050
else:
5151
self._page += 1

0 commit comments

Comments
 (0)