Skip to content

Commit a612248

Browse files
authored
support semi incremental by adding extractor record filter (#13520)
* support semi incremental by adding extractor record filter * refactor extractor into a record_selector that supports extraction and filtering of response records
1 parent c6d83b3 commit a612248

File tree

11 files changed

+216
-29
lines changed

11 files changed

+216
-29
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_extractor.py

-15
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Any, List, Mapping
7+
8+
import requests
9+
from airbyte_cdk.sources.declarative.types import Record
10+
11+
12+
class HttpSelector(ABC):
13+
@abstractmethod
14+
def select_records(
15+
self,
16+
response: requests.Response,
17+
stream_state: Mapping[str, Any],
18+
stream_slice: Mapping[str, Any] = None,
19+
next_page_token: Mapping[str, Any] = None,
20+
) -> List[Record]:
21+
pass

airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66

77
import requests
88
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
9-
from airbyte_cdk.sources.declarative.extractors.http_extractor import HttpExtractor
109
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
1110
from airbyte_cdk.sources.declarative.types import Record
1211
from jello import lib as jello_lib
1312

1413

15-
class JelloExtractor(HttpExtractor):
14+
class JelloExtractor:
1615
default_transform = "."
1716

1817
def __init__(self, transform: str, decoder: Decoder, config, kwargs=None):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, List, Mapping
6+
7+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
8+
from airbyte_cdk.sources.declarative.types import Record
9+
10+
11+
class RecordFilter:
12+
def __init__(self, config, condition: str = None):
13+
self._config = config
14+
self._filter_interpolator = InterpolatedBoolean(condition)
15+
16+
def filter_records(
17+
self,
18+
records: List[Record],
19+
stream_state: Mapping[str, Any],
20+
stream_slice: Mapping[str, Any] = None,
21+
next_page_token: Mapping[str, Any] = None,
22+
) -> List[Record]:
23+
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
24+
return [record for record in records if self._filter_interpolator.eval(self._config, record=record, **kwargs)]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, List, Mapping
6+
7+
import requests
8+
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
9+
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
10+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
11+
from airbyte_cdk.sources.declarative.types import Record
12+
13+
14+
class RecordSelector(HttpSelector):
15+
"""
16+
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
17+
records based on a heuristic.
18+
"""
19+
20+
def __init__(self, extractor: JelloExtractor, record_filter: RecordFilter = None):
21+
self._extractor = extractor
22+
self._record_filter = record_filter
23+
24+
def select_records(
25+
self,
26+
response: requests.Response,
27+
stream_state: Mapping[str, Any],
28+
stream_slice: Mapping[str, Any] = None,
29+
next_page_token: Mapping[str, Any] = None,
30+
) -> List[Record]:
31+
all_records = self._extractor.extract_records(response)
32+
if self._record_filter:
33+
return self._record_filter.filter_records(
34+
all_records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
35+
)
36+
return all_records

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/conditional_paginator.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ class ConditionalPaginator:
1515
A paginator that performs pagination by incrementing a page number and stops based on a provided stop condition.
1616
"""
1717

18-
def __init__(self, stop_condition_template: str, state: DictState, decoder: Decoder, config):
19-
self._stop_condition_template = InterpolatedBoolean(stop_condition_template)
18+
def __init__(self, stop_condition: str, state: DictState, decoder: Decoder, config):
19+
self._stop_condition_interpolator = InterpolatedBoolean(stop_condition)
2020
self._state: DictState = state
2121
self._decoder = decoder
2222
self._config = config
2323

2424
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
2525
decoded_response = self._decoder.decode(response)
2626
headers = response.headers
27-
should_stop = self._stop_condition_template.eval(
27+
should_stop = self._stop_condition_interpolator.eval(
2828
self._config, decoded_response=decoded_response, headers=headers, last_records=last_records
2929
)
3030

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import requests
88
from airbyte_cdk.models import SyncMode
9-
from airbyte_cdk.sources.declarative.extractors.http_extractor import HttpExtractor
9+
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
1010
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
1111
from airbyte_cdk.sources.declarative.requesters.requester import Requester
1212
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
@@ -22,15 +22,15 @@ def __init__(
2222
primary_key,
2323
requester: Requester,
2424
paginator: Paginator,
25-
extractor: HttpExtractor,
25+
record_selector: HttpSelector,
2626
stream_slicer: StreamSlicer,
2727
state: State,
2828
):
2929
self._name = name
3030
self._primary_key = primary_key
3131
self._paginator = paginator
3232
self._requester = requester
33-
self._extractor = extractor
33+
self._record_selector = record_selector
3434
super().__init__(self._requester.get_authenticator())
3535
self._iterator: StreamSlicer = stream_slicer
3636
self._state: State = state.deep_copy()
@@ -190,7 +190,9 @@ def parse_response(
190190
next_page_token: Mapping[str, Any] = None,
191191
) -> Iterable[Mapping]:
192192
self._last_response = response
193-
records = self._extractor.extract_records(response)
193+
records = self._record_selector.select_records(
194+
response=response, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
195+
)
194196
self._last_records = records
195197
return records
196198

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import pytest
6+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
7+
8+
9+
@pytest.mark.parametrize(
10+
"test_name, filter_template, records, expected_records",
11+
[
12+
(
13+
"test_using_state_filter",
14+
"{{ record['created_at'] > stream_state['created_at'] }}",
15+
[{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
16+
[{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
17+
),
18+
(
19+
"test_with_slice_filter",
20+
"{{ record['last_seen'] >= stream_slice['last_seen'] }}",
21+
[{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}],
22+
[{"id": 3, "last_seen": "06-10-21"}],
23+
),
24+
(
25+
"test_with_next_page_token_filter",
26+
"{{ record['id'] >= next_page_token['last_seen_id'] }}",
27+
[{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}],
28+
[{"id": 14}, {"id": 15}],
29+
),
30+
(
31+
"test_missing_filter_fields_return_no_results",
32+
"{{ record['id'] >= next_page_token['path_to_nowhere'] }}",
33+
[{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}],
34+
[],
35+
),
36+
],
37+
)
38+
def test_record_filter(test_name, filter_template, records, expected_records):
39+
config = {"response_override": "stop_if_you_see_me"}
40+
stream_state = {"created_at": "06-06-21"}
41+
stream_slice = {"last_seen": "06-10-21"}
42+
next_page_token = {"last_seen_id": 14}
43+
record_filter = RecordFilter(config=config, condition=filter_template)
44+
45+
actual_records = record_filter.filter_records(
46+
records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
47+
)
48+
assert actual_records == expected_records
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import json
6+
7+
import pytest
8+
import requests
9+
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
10+
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
11+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
12+
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
13+
14+
15+
@pytest.mark.parametrize(
16+
"test_name, transform_template, filter_template, body, expected_records",
17+
[
18+
(
19+
"test_with_extractor_and_filter",
20+
"_.data",
21+
"{{ record['created_at'] > stream_state['created_at'] }}",
22+
{"data": [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}]},
23+
[{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}],
24+
),
25+
(
26+
"test_no_record_filter_returns_all_records",
27+
"_.data",
28+
None,
29+
{"data": [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}]},
30+
[{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}],
31+
),
32+
],
33+
)
34+
def test_record_filter(test_name, transform_template, filter_template, body, expected_records):
35+
config = {"response_override": "stop_if_you_see_me"}
36+
stream_state = {"created_at": "06-06-21"}
37+
stream_slice = {"last_seen": "06-10-21"}
38+
next_page_token = {"last_seen_id": 14}
39+
40+
response = create_response(body)
41+
decoder = JsonDecoder()
42+
extractor = JelloExtractor(transform=transform_template, decoder=decoder, config=config, kwargs={})
43+
if filter_template is None:
44+
record_filter = None
45+
else:
46+
record_filter = RecordFilter(config=config, condition=filter_template)
47+
record_selector = RecordSelector(extractor=extractor, record_filter=record_filter)
48+
49+
actual_records = record_selector.select_records(
50+
response=response, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
51+
)
52+
assert actual_records == expected_records
53+
54+
55+
def create_response(body):
56+
response = requests.Response()
57+
response._content = json.dumps(body).encode("utf-8")
58+
return response

airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ def test():
2222
next_page_token = {"cursor": "cursor_value"}
2323
paginator.next_page_token.return_value = next_page_token
2424

25-
extractor = MagicMock()
26-
extractor.extract_records.return_value = records
25+
record_selector = MagicMock()
26+
record_selector.select_records.return_value = records
2727

2828
iterator = MagicMock()
2929
stream_slices = [{"date": "2022-01-01"}, {"date": "2022-01-02"}]
@@ -62,7 +62,7 @@ def test():
6262
use_cache = True
6363
requester.use_cache = use_cache
6464

65-
retriever = SimpleRetriever("stream_name", primary_key, requester, paginator, extractor, iterator, state)
65+
retriever = SimpleRetriever("stream_name", primary_key, requester, paginator, record_selector, iterator, state)
6666

6767
# hack because we clone the state...
6868
retriever._state = state

airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
66
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
7+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
8+
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
79
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
810
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
911
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
@@ -86,6 +88,13 @@ def test_full_config():
8688
extractor:
8789
class_name: airbyte_cdk.sources.declarative.extractors.jello.JelloExtractor
8890
decoder: "*ref(decoder)"
91+
selector:
92+
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
93+
extractor:
94+
decoder: "*ref(decoder)"
95+
record_filter:
96+
class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter
97+
condition: "{{ record['id'] > stream_state['id'] }}"
8998
metadata_paginator:
9099
class_name: "airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator.NextPageUrlPaginator"
91100
next_page_token_template:
@@ -139,6 +148,8 @@ def test_full_config():
139148
default: "marketing/lists"
140149
paginator:
141150
ref: "*ref(metadata_paginator)"
151+
record_selector:
152+
ref: "*ref(selector)"
142153
check:
143154
class_name: airbyte_cdk.sources.declarative.checks.check_stream.CheckStream
144155
stream_names: ["list_stream"]
@@ -156,8 +167,11 @@ def test_full_config():
156167
assert type(stream._retriever) == SimpleRetriever
157168
assert stream._retriever._requester._method == HttpMethod.GET
158169
assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"]
159-
assert type(stream._retriever._extractor._decoder) == JsonDecoder
160-
assert stream._retriever._extractor._transform == ".result[]"
170+
assert type(stream._retriever._record_selector) == RecordSelector
171+
assert type(stream._retriever._record_selector._extractor._decoder) == JsonDecoder
172+
assert stream._retriever._record_selector._extractor._transform == ".result[]"
173+
assert type(stream._retriever._record_selector._record_filter) == RecordFilter
174+
assert stream._retriever._record_selector._record_filter._filter_interpolator._condition == "{{ record['id'] > stream_state['id'] }}"
161175
assert stream._schema_loader._file_path._string == "./source_sendgrid/schemas/lists.json"
162176

163177
checker = factory.create_component(config["check"], input_config)()

0 commit comments

Comments
 (0)