@@ -170,7 +170,9 @@ def test_full_refresh_read_a_single_slice_with_debug(constructor):
170
170
* records ,
171
171
]
172
172
173
- # Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet
173
+ # Synchronous streams emit a final state message to indicate that the stream has finished reading
174
+ # Concurrent streams don't emit their own state messages - the concurrent source observes the cursor
175
+ # and emits the state messages. Therefore, we can only check the value of the cursor's state at the end
174
176
if constructor == _stream :
175
177
expected_records .append (
176
178
AirbyteMessage (
@@ -187,6 +189,10 @@ def test_full_refresh_read_a_single_slice_with_debug(constructor):
187
189
188
190
actual_records = _read (stream , configured_stream , logger , slice_logger , message_repository , state_manager , internal_config )
189
191
192
+ if constructor == _concurrent_stream :
193
+ assert hasattr (stream ._cursor , "state" )
194
+ assert str (stream ._cursor .state ) == "{'__ab_full_refresh_state_message': True}"
195
+
190
196
assert actual_records == expected_records
191
197
192
198
@@ -216,7 +222,9 @@ def test_full_refresh_read_a_single_slice(constructor):
216
222
217
223
expected_records = [* records ]
218
224
219
- # Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet
225
+ # Synchronous streams emit a final state message to indicate that the stream has finished reading
226
+ # Concurrent streams don't emit their own state messages - the concurrent source observes the cursor
227
+ # and emits the state messages. Therefore, we can only check the value of the cursor's state at the end
220
228
if constructor == _stream :
221
229
expected_records .append (
222
230
AirbyteMessage (
@@ -233,6 +241,10 @@ def test_full_refresh_read_a_single_slice(constructor):
233
241
234
242
actual_records = _read (stream , configured_stream , logger , slice_logger , message_repository , state_manager , internal_config )
235
243
244
+ if constructor == _concurrent_stream :
245
+ assert hasattr (stream ._cursor , "state" )
246
+ assert str (stream ._cursor .state ) == "{'__ab_full_refresh_state_message': True}"
247
+
236
248
assert actual_records == expected_records
237
249
238
250
@@ -270,7 +282,9 @@ def test_full_refresh_read_two_slices(constructor):
270
282
* records_partition_2 ,
271
283
]
272
284
273
- # Temporary check to only validate the final state message for synchronous sources since it has not been implemented for concurrent yet
285
+ # Synchronous streams emit a final state message to indicate that the stream has finished reading
286
+ # Concurrent streams don't emit their own state messages - the concurrent source observes the cursor
287
+ # and emits the state messages. Therefore, we can only check the value of the cursor's state at the end
274
288
if constructor == _stream or constructor == _stream_with_no_cursor_field :
275
289
expected_records .append (
276
290
AirbyteMessage (
@@ -287,6 +301,10 @@ def test_full_refresh_read_two_slices(constructor):
287
301
288
302
actual_records = _read (stream , configured_stream , logger , slice_logger , message_repository , state_manager , internal_config )
289
303
304
+ if constructor == _concurrent_stream :
305
+ assert hasattr (stream ._cursor , "state" )
306
+ assert str (stream ._cursor .state ) == "{'__ab_full_refresh_state_message': True}"
307
+
290
308
for record in expected_records :
291
309
assert record in actual_records
292
310
assert len (actual_records ) == len (expected_records )
0 commit comments