Skip to content

Commit ce770d3

Browse files
authored
Catch empty state in incremental SAT (#22353)
* Catch state being empty * Update test_two_sequential_reads to catch empty state on first read * Add integration test of empty state * Fix legacy state test * Move state_name to variable * Clean up * Format * Fix rogue test
1 parent ddb80cd commit ce770d3

File tree

2 files changed

+99
-3
lines changed

2 files changed

+99
-3
lines changed

airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_incremental.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def configured_catalog_for_incremental_fixture(configured_catalog) -> Configured
8383
return catalog
8484

8585

86-
def records_with_state(records, state, stream_mapping, state_cursor_paths) -> Iterable[Tuple[Any, Any]]:
86+
def records_with_state(records, state, stream_mapping, state_cursor_paths) -> Iterable[Tuple[Any, Any, Any]]:
8787
"""Iterate over records and return cursor value with corresponding cursor value from state"""
8888
for record in records:
8989
stream_name = record.record.stream
@@ -180,7 +180,13 @@ def test_two_sequential_reads(
180180
latest_state = states_1[-1].state.data
181181
state_input = states_1[-1].state.data
182182

183-
for record_value, state_value, stream_name in records_with_state(records_1, latest_state, stream_mapping, cursor_paths):
183+
parsed_records_1 = list(records_with_state(records_1, latest_state, stream_mapping, cursor_paths))
184+
185+
# This catches the case of a connector that emits an invalid state that is not compatible with the schema
186+
# See https://github.com/airbytehq/airbyte/issues/21863 to understand more
187+
assert parsed_records_1, "At least one valid state should be produced, given a cursor path"
188+
189+
for record_value, state_value, stream_name in parsed_records_1:
184190
assert (
185191
record_value <= state_value
186192
), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: {stream_name}"
@@ -197,7 +203,7 @@ def test_read_sequential_slices(
197203
self, inputs: IncrementalConfig, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner
198204
):
199205
"""
200-
Incremental test that makes calls the read method without a state checkpoint. Then we partition the results by stream and
206+
Incremental test that makes calls to the read method without a state checkpoint. Then we partition the results by stream and
201207
slice checkpoints.
202208
Then we make additional read method calls using the state message and verify the correctness of the
203209
messages in the response.

airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_incremental.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,96 @@ def test_incremental_two_sequential_reads(
192192
)
193193

194194

195+
@pytest.mark.parametrize(
196+
"stream_name, cursor_type, cursor_paths, records1, records2, latest_state, expected_error",
197+
[
198+
(
199+
"test_stream",
200+
{
201+
"dateCreated": {
202+
"type": "string",
203+
"format": "date-time"
204+
}
205+
},
206+
{'test_stream': ['dateCreated']},
207+
[{"dateCreated": "2020-01-01T01:01:01.000000Z"}, {"dateCreated": "2020-01-02T01:01:01.000000Z"}],
208+
[],
209+
{"dateCreated": "2020-01-02T01:01:01.000000Z"},
210+
does_not_raise(),
211+
),
212+
(
213+
"test_stream",
214+
{
215+
"dateCreated": {
216+
"type": "string",
217+
"format": "date-time"
218+
}
219+
},
220+
{'test_stream': ['dateCreated']},
221+
[{"dateCreated": "2020-01-01T01:01:01.000000Z"}, {"dateCreated": "2020-01-02T01:01:01.000000Z"}],
222+
[],
223+
{},
224+
pytest.raises(AssertionError, match="At least one valid state should be produced, given a cursor path")
225+
),
226+
],
227+
)
228+
@pytest.mark.parametrize(
229+
"run_per_stream_test",
230+
[
231+
pytest.param(False, id="test_two_sequential_reads_using_a_mock_connector_emitting_legacy_state"),
232+
pytest.param(True, id="test_two_sequential_reads_using_a_mock_connector_emitting_per_stream_state"),
233+
],
234+
)
235+
def test_incremental_two_sequential_reads_state_invalid(
236+
stream_name, records1, records2, latest_state, cursor_type, cursor_paths, expected_error, run_per_stream_test
237+
):
238+
input_config = IncrementalConfig()
239+
catalog = ConfiguredAirbyteCatalog(
240+
streams=[
241+
ConfiguredAirbyteStream(
242+
stream=AirbyteStream(
243+
name=stream_name,
244+
json_schema={"type": "object", "properties": cursor_type},
245+
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
246+
),
247+
sync_mode=SyncMode.incremental,
248+
destination_sync_mode=DestinationSyncMode.overwrite,
249+
default_cursor_field=["dateCreated"],
250+
cursor_field=["dateCreated"],
251+
)
252+
]
253+
)
254+
255+
if run_per_stream_test:
256+
call_read_output_messages = [
257+
*build_messages_from_record_data(stream_name, records1),
258+
build_per_stream_state_message(descriptor=StreamDescriptor(name=stream_name), stream_state=latest_state),
259+
]
260+
else:
261+
stream_state = dict()
262+
stream_state[stream_name] = latest_state
263+
call_read_output_messages = [
264+
*build_messages_from_record_data(stream_name, records1),
265+
build_state_message(stream_state),
266+
]
267+
268+
call_read_with_state_output_messages = build_messages_from_record_data(stream_name, records2)
269+
270+
docker_runner_mock = MagicMock()
271+
docker_runner_mock.call_read.return_value = call_read_output_messages
272+
docker_runner_mock.call_read_with_state.return_value = call_read_with_state_output_messages
273+
274+
t = _TestIncremental()
275+
with expected_error:
276+
t.test_two_sequential_reads(
277+
inputs=input_config,
278+
connector_config=MagicMock(),
279+
configured_catalog_for_incremental=catalog,
280+
cursor_paths=cursor_paths,
281+
docker_runner=docker_runner_mock,
282+
)
283+
284+
195285
@pytest.mark.parametrize(
196286
"records, state_records, threshold_days, expected_error",
197287
[

0 commit comments

Comments
 (0)