Skip to content

Commit 2787acc

Browse files
maxi297jatinyadav-cc
authored andcommitted
Have StateBuilder return our actual state object and not simply a dict (airbytehq#34625)
1 parent fbccc9d commit 2787acc

File tree

5 files changed

+79
-149
lines changed

5 files changed

+79
-149
lines changed

airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,16 @@
2626
from airbyte_cdk.exception_handler import assemble_uncaught_exception
2727
from airbyte_cdk.logger import AirbyteLogFormatter
2828
from airbyte_cdk.sources import Source
29-
from airbyte_protocol.models import AirbyteLogMessage, AirbyteMessage, AirbyteStreamStatus, ConfiguredAirbyteCatalog, Level, TraceType, Type
29+
from airbyte_protocol.models import (
30+
AirbyteLogMessage,
31+
AirbyteMessage,
32+
AirbyteStateMessage,
33+
AirbyteStreamStatus,
34+
ConfiguredAirbyteCatalog,
35+
Level,
36+
TraceType,
37+
Type,
38+
)
3039
from pydantic.error_wrappers import ValidationError
3140

3241

@@ -104,7 +113,7 @@ def read(
104113
source: Source,
105114
config: Mapping[str, Any],
106115
catalog: ConfiguredAirbyteCatalog,
107-
state: Optional[Any] = None,
116+
state: Optional[List[AirbyteStateMessage]] = None,
108117
expecting_exception: bool = False,
109118
) -> EntrypointOutput:
110119
"""
@@ -133,7 +142,7 @@ def read(
133142
args.extend(
134143
[
135144
"--state",
136-
make_file(tmp_directory_path / "state.json", state),
145+
make_file(tmp_directory_path / "state.json", f"[{','.join([stream_state.json() for stream_state in state])}]"),
137146
]
138147
)
139148
args.append("--debug")
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

3-
from typing import Any, Dict, List
3+
from typing import Any, List
4+
5+
from airbyte_protocol.models import AirbyteStateMessage
46

57

68
class StateBuilder:
79
def __init__(self) -> None:
8-
self._state: List[Dict[str, Any]] = []
10+
self._state: List[AirbyteStateMessage] = []
911

1012
def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder":
11-
self._state.append({
13+
self._state.append(AirbyteStateMessage.parse_obj({
1214
"type": "STREAM",
1315
"stream": {
1416
"stream_state": state,
1517
"stream_descriptor": {
1618
"name": stream_name
1719
}
1820
}
19-
})
21+
}))
2022
return self
2123

22-
def build(self) -> List[Dict[str, Any]]:
24+
def build(self) -> List[AirbyteStateMessage]:
2325
return self._state

airbyte-cdk/python/unit_tests/sources/file_based/scenarios/incremental_scenarios.py

Lines changed: 52 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
from airbyte_cdk.test.state_builder import StateBuilder
56
from unit_tests.sources.file_based.helpers import LowHistoryLimitCursor
67
from unit_tests.sources.file_based.scenarios.file_based_source_builder import FileBasedSourceBuilder
78
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
@@ -39,17 +40,9 @@
3940
)
4041
.set_incremental_scenario_config(
4142
IncrementalScenarioConfig(
42-
input_state=[
43-
{
44-
"type": "STREAM",
45-
"stream": {
46-
"stream_state": {
47-
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
48-
},
49-
"stream_descriptor": {"name": "stream1"},
50-
},
51-
}
52-
],
43+
input_state=StateBuilder().with_stream_state("stream1", {
44+
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
45+
}).build(),
5346
)
5447
)
5548
.set_expected_records(
@@ -140,17 +133,9 @@
140133
)
141134
.set_incremental_scenario_config(
142135
IncrementalScenarioConfig(
143-
input_state=[
144-
{
145-
"type": "STREAM",
146-
"stream": {
147-
"stream_state": {
148-
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
149-
},
150-
"stream_descriptor": {"name": "stream1"},
151-
},
152-
}
153-
],
136+
input_state=StateBuilder().with_stream_state("stream1", {
137+
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
138+
}).build(),
154139
)
155140
)
156141
.set_expected_records(
@@ -223,17 +208,9 @@
223208
)
224209
.set_incremental_scenario_config(
225210
IncrementalScenarioConfig(
226-
input_state=[
227-
{
228-
"type": "STREAM",
229-
"stream": {
230-
"stream_state": {
231-
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
232-
},
233-
"stream_descriptor": {"name": "stream1"},
234-
},
235-
}
236-
],
211+
input_state=StateBuilder().with_stream_state("stream1", {
212+
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
213+
}).build(),
237214
)
238215
)
239216
.set_expected_records(
@@ -377,7 +354,7 @@
377354
)
378355
.set_incremental_scenario_config(
379356
IncrementalScenarioConfig(
380-
input_state=[],
357+
input_state=StateBuilder().build(),
381358
)
382359
)
383360
).build()
@@ -499,7 +476,7 @@
499476
)
500477
.set_incremental_scenario_config(
501478
IncrementalScenarioConfig(
502-
input_state=[],
479+
input_state=StateBuilder().build(),
503480
)
504481
)
505482
).build()
@@ -593,15 +570,9 @@
593570
)
594571
.set_incremental_scenario_config(
595572
IncrementalScenarioConfig(
596-
input_state=[
597-
{
598-
"type": "STREAM",
599-
"stream": {
600-
"stream_state": {"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"}},
601-
"stream_descriptor": {"name": "stream1"},
602-
},
603-
}
604-
],
573+
input_state=StateBuilder().with_stream_state("stream1", {
574+
"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"},
575+
}).build(),
605576
)
606577
)
607578
).build()
@@ -731,7 +702,7 @@
731702
)
732703
.set_incremental_scenario_config(
733704
IncrementalScenarioConfig(
734-
input_state=[],
705+
input_state=StateBuilder().build(),
735706
)
736707
)
737708
).build()
@@ -891,7 +862,7 @@
891862
)
892863
.set_incremental_scenario_config(
893864
IncrementalScenarioConfig(
894-
input_state=[],
865+
input_state=StateBuilder().build(),
895866
)
896867
)
897868
).build()
@@ -1035,15 +1006,9 @@
10351006
)
10361007
.set_incremental_scenario_config(
10371008
IncrementalScenarioConfig(
1038-
input_state=[
1039-
{
1040-
"type": "STREAM",
1041-
"stream": {
1042-
"stream_state": {"history": {"a.csv": "2023-06-05T03:54:07.000000Z"}},
1043-
"stream_descriptor": {"name": "stream1"},
1044-
},
1045-
}
1046-
],
1009+
input_state=StateBuilder().with_stream_state("stream1", {
1010+
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
1011+
}).build(),
10471012
)
10481013
)
10491014
).build()
@@ -1163,17 +1128,9 @@
11631128
)
11641129
.set_incremental_scenario_config(
11651130
IncrementalScenarioConfig(
1166-
input_state=[
1167-
{
1168-
"type": "STREAM",
1169-
"stream": {
1170-
"stream_state": {
1171-
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
1172-
},
1173-
"stream_descriptor": {"name": "stream1"},
1174-
},
1175-
}
1176-
],
1131+
input_state=StateBuilder().with_stream_state("stream1", {
1132+
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
1133+
}).build(),
11771134
)
11781135
)
11791136
).build()
@@ -1348,21 +1305,13 @@
13481305
)
13491306
.set_incremental_scenario_config(
13501307
IncrementalScenarioConfig(
1351-
input_state=[
1352-
{
1353-
"type": "STREAM",
1354-
"stream": {
1355-
"stream_state": {
1356-
"history": {
1357-
"very_very_old_file.csv": "2023-06-01T03:54:07.000000Z",
1358-
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
1359-
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
1360-
},
1361-
},
1362-
"stream_descriptor": {"name": "stream1"},
1363-
},
1364-
}
1365-
],
1308+
input_state=StateBuilder().with_stream_state("stream1", {
1309+
"history": {
1310+
"very_very_old_file.csv": "2023-06-01T03:54:07.000000Z",
1311+
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
1312+
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
1313+
},
1314+
}).build(),
13661315
)
13671316
)
13681317
).build()
@@ -1546,7 +1495,7 @@
15461495
)
15471496
.set_incremental_scenario_config(
15481497
IncrementalScenarioConfig(
1549-
input_state=[],
1498+
input_state=StateBuilder().build(),
15501499
)
15511500
)
15521501
).build()
@@ -1652,21 +1601,13 @@
16521601
)
16531602
.set_incremental_scenario_config(
16541603
IncrementalScenarioConfig(
1655-
input_state=[
1656-
{
1657-
"type": "STREAM",
1658-
"stream": {
1659-
"stream_state": {
1660-
"history": {
1661-
"b.csv": "2023-06-05T03:54:07.000000Z",
1662-
"c.csv": "2023-06-05T03:54:07.000000Z",
1663-
"d.csv": "2023-06-05T03:54:07.000000Z",
1664-
},
1665-
},
1666-
"stream_descriptor": {"name": "stream1"},
1667-
},
1668-
}
1669-
],
1604+
input_state=StateBuilder().with_stream_state("stream1", {
1605+
"history": {
1606+
"b.csv": "2023-06-05T03:54:07.000000Z",
1607+
"c.csv": "2023-06-05T03:54:07.000000Z",
1608+
"d.csv": "2023-06-05T03:54:07.000000Z",
1609+
},
1610+
}).build(),
16701611
)
16711612
)
16721613
).build()
@@ -1794,21 +1735,13 @@
17941735
)
17951736
.set_incremental_scenario_config(
17961737
IncrementalScenarioConfig(
1797-
input_state=[
1798-
{
1799-
"type": "STREAM",
1800-
"stream": {
1801-
"stream_state": {
1802-
"history": {
1803-
"c.csv": "2023-06-07T03:54:07.000000Z",
1804-
"d.csv": "2023-06-08T03:54:07.000000Z",
1805-
"e.csv": "2023-06-08T03:54:07.000000Z",
1806-
},
1807-
},
1808-
"stream_descriptor": {"name": "stream1"},
1809-
},
1810-
}
1811-
],
1738+
input_state=StateBuilder().with_stream_state("stream1", {
1739+
"history": {
1740+
"c.csv": "2023-06-07T03:54:07.000000Z",
1741+
"d.csv": "2023-06-08T03:54:07.000000Z",
1742+
"e.csv": "2023-06-08T03:54:07.000000Z",
1743+
},
1744+
}).build(),
18121745
)
18131746
)
18141747
).build()
@@ -1962,21 +1895,13 @@
19621895
)
19631896
.set_incremental_scenario_config(
19641897
IncrementalScenarioConfig(
1965-
input_state=[
1966-
{
1967-
"type": "STREAM",
1968-
"stream": {
1969-
"stream_state": {
1970-
"history": {
1971-
"old_file.csv": "2023-06-05T00:00:00.000000Z",
1972-
"c.csv": "2023-06-07T03:54:07.000000Z",
1973-
"d.csv": "2023-06-08T03:54:07.000000Z",
1974-
},
1975-
},
1976-
"stream_descriptor": {"name": "stream1"},
1977-
},
1978-
}
1979-
],
1898+
input_state=StateBuilder().with_stream_state("stream1", {
1899+
"history": {
1900+
"old_file.csv": "2023-06-05T00:00:00.000000Z",
1901+
"c.csv": "2023-06-07T03:54:07.000000Z",
1902+
"d.csv": "2023-06-08T03:54:07.000000Z",
1903+
},
1904+
}).build(),
19801905
)
19811906
)
19821907
).build()

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
55
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType
6+
from airbyte_cdk.test.state_builder import StateBuilder
67
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
78
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
89
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
@@ -85,7 +86,7 @@
8586
)
8687

8788

88-
LEGACY_STATE = [{"type": "STREAM", "stream": {"stream_state": {"cursor_field": 0}, "stream_descriptor": {"name": "stream1"}}}]
89+
LEGACY_STATE = StateBuilder().with_stream_state("stream1", {"cursor_field": 0}).build()
8990
test_incremental_stream_without_slice_boundaries_with_legacy_state = (
9091
TestScenarioBuilder()
9192
.set_name("test_incremental_stream_without_slice_boundaries_with_legacy_state")
@@ -162,18 +163,10 @@
162163
)
163164

164165

165-
CONCURRENT_STATE = [
166-
{
167-
"type": "STREAM",
168-
"stream": {
169-
"stream_state": {
170-
"slices": [{"start": 0, "end": 0}],
171-
"state_type": ConcurrencyCompatibleStateType.date_range.value,
172-
},
173-
"stream_descriptor": {"name": "stream1"},
174-
},
175-
},
176-
]
166+
CONCURRENT_STATE = StateBuilder().with_stream_state("stream1", {
167+
"slices": [{"start": 0, "end": 0}],
168+
"state_type": ConcurrencyCompatibleStateType.date_range.value,
169+
}).build()
177170
test_incremental_stream_without_slice_boundaries_with_concurrent_state = (
178171
TestScenarioBuilder()
179172
.set_name("test_incremental_stream_without_slice_boundaries_with_concurrent_state")

0 commit comments

Comments
 (0)