@@ -70,11 +70,17 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It
70
70
cursor_field = helper .field (stream .cursor_field )
71
71
record_value = cursor_field .parse (record = record .record .data )
72
72
try :
73
+ if state [stream_name ] is None :
74
+ continue
75
+
73
76
# first attempt to parse the state value assuming the state object is namespaced on stream names
74
77
state_value = cursor_field .parse (record = state [stream_name ], path = state_cursor_paths [stream_name ])
75
78
except KeyError :
76
- # try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value)
77
- state_value = cursor_field .parse (record = state , path = state_cursor_paths [stream_name ])
79
+ try :
80
+ # try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value)
81
+ state_value = cursor_field .parse (record = state , path = state_cursor_paths [stream_name ])
82
+ except KeyError :
83
+ continue
78
84
yield record_value , state_value , stream_name
79
85
80
86
@@ -136,6 +142,68 @@ def test_two_sequential_reads(
136
142
record_value , state_value , threshold_days
137
143
), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: { stream_name } "
138
144
145
+ def test_read_sequential_slices (
146
+ self , inputs : IncrementalConfig , connector_config , configured_catalog_for_incremental , cursor_paths , docker_runner : ConnectorRunner
147
+ ):
148
+ """
149
+ Incremental test that makes calls the read method without a state checkpoint. Then we partition the results by stream and
150
+ slice checkpoints resulting in batches of messages that look like:
151
+ <state message>
152
+ <record message>
153
+ ...
154
+ <record message>
155
+
156
+ Using these batches, we then make additional read method calls using the state message and verify the correctness of the
157
+ messages in the response.
158
+ """
159
+ if inputs .skip_comprehensive_incremental_tests :
160
+ pytest .skip ("Skipping new incremental test based on acceptance-test-config.yml" )
161
+ return
162
+
163
+ threshold_days = getattr (inputs , "threshold_days" ) or 0
164
+ stream_mapping = {stream .stream .name : stream for stream in configured_catalog_for_incremental .streams }
165
+
166
+ output = docker_runner .call_read (connector_config , configured_catalog_for_incremental )
167
+ records_1 = filter_output (output , type_ = Type .RECORD )
168
+ states_1 = filter_output (output , type_ = Type .STATE )
169
+
170
+ assert states_1 , "Should produce at least one state"
171
+ assert records_1 , "Should produce at least one record"
172
+
173
+ latest_state = states_1 [- 1 ].state .data
174
+ for record_value , state_value , stream_name in records_with_state (records_1 , latest_state , stream_mapping , cursor_paths ):
175
+ assert (
176
+ record_value <= state_value
177
+ ), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: { stream_name } "
178
+
179
+ # Create partitions made up of one state message followed by any records that come before the next state
180
+ filtered_messages = [message for message in output if message .type == Type .STATE or message .type == Type .RECORD ]
181
+ right_index = len (filtered_messages )
182
+ checkpoint_messages = []
183
+ for index , message in reversed (list (enumerate (filtered_messages ))):
184
+ if message .type == Type .STATE :
185
+ message_group = (filtered_messages [index ], filtered_messages [index + 1 : right_index ])
186
+ checkpoint_messages .insert (0 , message_group )
187
+ right_index = index
188
+
189
+ # We sometimes have duplicate identical state messages in a stream which we can filter out to speed things up
190
+ checkpoint_messages = [message for index , message in enumerate (checkpoint_messages ) if message not in checkpoint_messages [:index ]]
191
+
192
+ # To avoid spamming APIs we only test a fraction of slices
193
+ num_slices_to_test = 1 if len (checkpoint_messages ) <= 5 else len (checkpoint_messages ) // 5
194
+ for message_batch in checkpoint_messages [::num_slices_to_test ]:
195
+ assert len (message_batch ) > 0 and message_batch [0 ].type == Type .STATE
196
+ current_state = message_batch [0 ]
197
+ output = docker_runner .call_read_with_state (connector_config , configured_catalog_for_incremental , current_state .state .data )
198
+ records = filter_output (output , type_ = Type .RECORD )
199
+
200
+ for record_value , state_value , stream_name in records_with_state (
201
+ records , current_state .state .data , stream_mapping , cursor_paths
202
+ ):
203
+ assert compare_cursor_with_threshold (
204
+ record_value , state_value , threshold_days
205
+ ), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: { stream_name } "
206
+
139
207
def test_state_with_abnormally_large_values (self , connector_config , configured_catalog , future_state , docker_runner : ConnectorRunner ):
140
208
configured_catalog = incremental_only_catalog (configured_catalog )
141
209
output = docker_runner .call_read_with_state (config = connector_config , catalog = configured_catalog , state = future_state )
0 commit comments