Skip to content

Commit 66b8f5a

Browse files
authored
Low-Code CDK: SubstreamSlicer.parent_key - dpath support added (#21900)
Signed-off-by: Sergey Chvalyuk <[email protected]>
1 parent 4061939 commit 66b8f5a

File tree

5 files changed

+79
-29
lines changed

5 files changed

+79
-29
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Co
636636
request_option=request_option,
637637
stream=declarative_stream,
638638
stream_slice_field=model.stream_slice_field,
639+
config=config,
639640
options=model.options,
640641
)
641642

@@ -726,7 +727,7 @@ def create_substream_slicer(self, model: SubstreamSlicerModel, config: Config, *
726727
]
727728
)
728729

729-
return SubstreamSlicer(parent_stream_configs=parent_stream_configs, options=model.options)
730+
return SubstreamSlicer(parent_stream_configs=parent_stream_configs, options=model.options, config=config)
730731

731732
@staticmethod
732733
def create_wait_time_from_header(model: WaitTimeFromHeaderModel, config: Config, **kwargs) -> WaitTimeFromHeaderBackoffStrategy:

airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
#
44

55
from dataclasses import InitVar, dataclass
6-
from typing import Any, Iterable, List, Mapping, Optional
6+
from typing import Any, Iterable, List, Mapping, Optional, Union
77

8+
import dpath.util
89
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
911
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
1012
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
11-
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
13+
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
1214
from airbyte_cdk.sources.streams.core import Stream
1315
from dataclasses_jsonschema import JsonSchemaMixin
1416

@@ -25,11 +27,16 @@ class ParentStreamConfig(JsonSchemaMixin):
2527
"""
2628

2729
stream: Stream
28-
parent_key: str
29-
stream_slice_field: str
30+
parent_key: Union[InterpolatedString, str]
31+
stream_slice_field: Union[InterpolatedString, str]
32+
config: Config
3033
options: InitVar[Mapping[str, Any]]
3134
request_option: Optional[RequestOption] = None
3235

36+
def __post_init__(self, options: Mapping[str, Any]):
37+
self.parent_key = InterpolatedString.create(self.parent_key, options=options)
38+
self.stream_slice_field = InterpolatedString.create(self.stream_slice_field, options=options)
39+
3340

3441
@dataclass
3542
class SubstreamSlicer(StreamSlicer, JsonSchemaMixin):
@@ -42,6 +49,7 @@ class SubstreamSlicer(StreamSlicer, JsonSchemaMixin):
4249
"""
4350

4451
parent_stream_configs: List[ParentStreamConfig]
52+
config: Config
4553
options: InitVar[Mapping[str, Any]]
4654

4755
def __post_init__(self, options: Mapping[str, Any]):
@@ -54,9 +62,10 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record]
5462
# This method is called after the records are processed.
5563
cursor = {}
5664
for parent_stream_config in self.parent_stream_configs:
57-
slice_value = stream_slice.get(parent_stream_config.stream_slice_field)
65+
stream_slice_field = parent_stream_config.stream_slice_field.eval(self.config)
66+
slice_value = stream_slice.get(stream_slice_field)
5867
if slice_value:
59-
cursor.update({parent_stream_config.stream_slice_field: slice_value})
68+
cursor.update({stream_slice_field: slice_value})
6069
self._cursor = cursor
6170

6271
def get_request_params(
@@ -100,7 +109,7 @@ def _get_request_option(self, option_type: RequestOptionType, stream_slice: Stre
100109
if stream_slice:
101110
for parent_config in self.parent_stream_configs:
102111
if parent_config.request_option and parent_config.request_option.inject_into == option_type:
103-
key = parent_config.stream_slice_field
112+
key = parent_config.stream_slice_field.eval(self.config)
104113
value = stream_slice.get(key)
105114
if value:
106115
params.update({key: value})
@@ -129,8 +138,8 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
129138
else:
130139
for parent_stream_config in self.parent_stream_configs:
131140
parent_stream = parent_stream_config.stream
132-
parent_field = parent_stream_config.parent_key
133-
stream_state_field = parent_stream_config.stream_slice_field
141+
parent_field = parent_stream_config.parent_key.eval(self.config)
142+
stream_state_field = parent_stream_config.stream_slice_field.eval(self.config)
134143
for parent_stream_slice in parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=None, stream_state=stream_state):
135144
empty_parent_slice = True
136145
parent_slice = parent_stream_slice
@@ -144,9 +153,14 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
144153
parent_record = parent_record.record.data
145154
else:
146155
continue
147-
empty_parent_slice = False
148-
stream_state_value = parent_record.get(parent_field)
149-
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}
156+
157+
try:
158+
stream_state_value = dpath.util.get(parent_record, parent_field)
159+
except KeyError:
160+
pass
161+
else:
162+
empty_parent_slice = False
163+
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}
150164
# If the parent slice contains no records,
151165
if empty_parent_slice:
152166
yield from []

airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -383,13 +383,13 @@ def test_create_substream_slicer():
383383
assert isinstance(parent_stream_configs[0].stream, DeclarativeStream)
384384
assert isinstance(parent_stream_configs[1].stream, DeclarativeStream)
385385

386-
assert stream_slicer.parent_stream_configs[0].parent_key == "id"
387-
assert stream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
386+
assert stream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
387+
assert stream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
388388
assert stream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
389389
assert stream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"
390390

391-
assert stream_slicer.parent_stream_configs[1].parent_key == "someid"
392-
assert stream_slicer.parent_stream_configs[1].stream_slice_field == "word_id"
391+
assert stream_slicer.parent_stream_configs[1].parent_key.eval({}) == "someid"
392+
assert stream_slicer.parent_stream_configs[1].stream_slice_field.eval({}) == "word_id"
393393
assert stream_slicer.parent_stream_configs[1].request_option is None
394394

395395

@@ -865,8 +865,8 @@ def test_custom_components_do_not_contain_extra_fields():
865865
assert isinstance(custom_substream_slicer, TestingCustomSubstreamSlicer)
866866

867867
assert len(custom_substream_slicer.parent_stream_configs) == 1
868-
assert custom_substream_slicer.parent_stream_configs[0].parent_key == "id"
869-
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
868+
assert custom_substream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
869+
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
870870
assert custom_substream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
871871
assert custom_substream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"
872872

@@ -911,8 +911,8 @@ def test_parse_custom_component_fields_if_subcomponent():
911911
assert custom_substream_slicer.custom_field == "here"
912912

913913
assert len(custom_substream_slicer.parent_stream_configs) == 1
914-
assert custom_substream_slicer.parent_stream_configs[0].parent_key == "id"
915-
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
914+
assert custom_substream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
915+
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
916916
assert custom_substream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
917917
assert custom_substream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"
918918

airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def read_records(
6363
"test_single_parent_slices_no_records",
6464
[
6565
ParentStreamConfig(
66-
stream=MockStream([{}], [], "first_stream"), parent_key="id", stream_slice_field="first_stream_id", options={}
66+
stream=MockStream([{}], [], "first_stream"), parent_key="id", stream_slice_field="first_stream_id", options={}, config={},
6767
)
6868
],
6969
[],
@@ -76,6 +76,7 @@ def read_records(
7676
parent_key="id",
7777
stream_slice_field="first_stream_id",
7878
options={},
79+
config={},
7980
)
8081
],
8182
[{"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 2, "parent_slice": {}}],
@@ -88,6 +89,7 @@ def read_records(
8889
parent_key="id",
8990
stream_slice_field="first_stream_id",
9091
options={},
92+
config={},
9193
)
9294
],
9395
[
@@ -104,12 +106,14 @@ def read_records(
104106
parent_key="id",
105107
stream_slice_field="first_stream_id",
106108
options={},
109+
config={},
107110
),
108111
ParentStreamConfig(
109112
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
110113
parent_key="id",
111114
stream_slice_field="second_stream_id",
112115
options={},
116+
config={},
113117
),
114118
],
115119
[
@@ -120,16 +124,42 @@ def read_records(
120124
{"parent_slice": {"slice": "second_parent"}, "second_stream_id": 20},
121125
],
122126
),
127+
(
128+
"test_missed_parent_key",
129+
[
130+
ParentStreamConfig(
131+
stream=MockStream([{}], [{"id": 0}, {"id": 1}, {"_id": 2}, {"id": 3}], "first_stream"),
132+
parent_key="id",
133+
stream_slice_field="first_stream_id",
134+
options={},
135+
config={},
136+
)
137+
],
138+
[{"first_stream_id": 0, "parent_slice": {}}, {"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 3, "parent_slice": {}}],
139+
),
140+
(
141+
"test_dpath_extraction",
142+
[
143+
ParentStreamConfig(
144+
stream=MockStream([{}], [{"a": {"b": 0}}, {"a": {"b": 1}}, {"a": {"c": 2}}, {"a": {"b": 3}}], "first_stream"),
145+
parent_key="a/b",
146+
stream_slice_field="first_stream_id",
147+
options={},
148+
config={},
149+
)
150+
],
151+
[{"first_stream_id": 0, "parent_slice": {}}, {"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 3, "parent_slice": {}}],
152+
),
123153
],
124154
)
125155
def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
126156
if expected_slices is None:
127157
try:
128-
SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={})
158+
SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={}, config={})
129159
assert False
130160
except ValueError:
131161
return
132-
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={})
162+
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={}, config={})
133163
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
134164
assert slices == expected_slices
135165

@@ -154,16 +184,18 @@ def test_update_cursor(test_name, stream_slice, expected_state):
154184
parent_key="id",
155185
stream_slice_field="first_stream_id",
156186
options={},
187+
config={},
157188
),
158189
ParentStreamConfig(
159190
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
160191
parent_key="id",
161192
stream_slice_field="second_stream_id",
162193
options={},
194+
config={},
163195
),
164196
]
165197

166-
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_name_to_config, options={})
198+
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_name_to_config, options={}, config={})
167199
slicer.update_cursor(stream_slice, None)
168200
updated_state = slicer.get_stream_state()
169201
assert expected_state == updated_state
@@ -244,17 +276,20 @@ def test_request_option(
244276
parent_key="id",
245277
stream_slice_field="first_stream_id",
246278
options={},
279+
config={},
247280
request_option=parent_stream_request_options[0],
248281
),
249282
ParentStreamConfig(
250283
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
251284
parent_key="id",
252285
stream_slice_field="second_stream_id",
253286
options={},
287+
config={},
254288
request_option=parent_stream_request_options[1],
255289
),
256290
],
257291
options={},
292+
config={},
258293
)
259294
stream_slice = {"first_stream_id": "1234", "second_stream_id": "4567"}
260295

airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,13 @@ def test_create_substream_slicer():
188188
assert len(parent_stream_configs) == 2
189189
assert isinstance(parent_stream_configs[0].stream, DeclarativeStream)
190190
assert isinstance(parent_stream_configs[1].stream, DeclarativeStream)
191-
assert stream_slicer.parent_stream_configs[0].parent_key == "id"
192-
assert stream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
191+
assert stream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
192+
assert stream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
193193
assert stream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
194194
assert stream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"
195195

196-
assert stream_slicer.parent_stream_configs[1].parent_key == "someid"
197-
assert stream_slicer.parent_stream_configs[1].stream_slice_field == "word_id"
196+
assert stream_slicer.parent_stream_configs[1].parent_key.eval({}) == "someid"
197+
assert stream_slicer.parent_stream_configs[1].stream_slice_field.eval({}) == "word_id"
198198
assert stream_slicer.parent_stream_configs[1].request_option is None
199199

200200

0 commit comments

Comments
 (0)