@@ -120,7 +120,8 @@ def get_message_groups(
120
120
raise ValueError (f"Unknown message group type: { type (message_group )} " )
121
121
122
122
try :
123
- configured_stream = configured_catalog .streams [0 ] # The connector builder currently only supports reading from a single stream at a time
123
+ # The connector builder currently only supports reading from a single stream at a time
124
+ configured_stream = configured_catalog .streams [0 ]
124
125
schema = schema_inferrer .get_stream_schema (configured_stream .stream .name )
125
126
except SchemaValidationException as exception :
126
127
for validation_error in exception .validation_errors :
@@ -183,7 +184,11 @@ def _get_message_groups(
183
184
and message .type == MessageType .LOG
184
185
and message .log .message .startswith (SliceLogger .SLICE_LOG_PREFIX )
185
186
):
186
- yield StreamReadSlices (pages = current_slice_pages , slice_descriptor = current_slice_descriptor , state = [latest_state_message ] if latest_state_message else [])
187
+ yield StreamReadSlices (
188
+ pages = current_slice_pages ,
189
+ slice_descriptor = current_slice_descriptor ,
190
+ state = [latest_state_message ] if latest_state_message else [],
191
+ )
187
192
current_slice_descriptor = self ._parse_slice_description (message .log .message )
188
193
current_slice_pages = []
189
194
at_least_one_page_in_group = False
@@ -230,7 +235,11 @@ def _get_message_groups(
230
235
else :
231
236
if current_page_request or current_page_response or current_page_records :
232
237
self ._close_page (current_page_request , current_page_response , current_slice_pages , current_page_records )
233
- yield StreamReadSlices (pages = current_slice_pages , slice_descriptor = current_slice_descriptor , state = [latest_state_message ] if latest_state_message else [])
238
+ yield StreamReadSlices (
239
+ pages = current_slice_pages ,
240
+ slice_descriptor = current_slice_descriptor ,
241
+ state = [latest_state_message ] if latest_state_message else [],
242
+ )
234
243
235
244
@staticmethod
236
245
def _need_to_close_page (at_least_one_page_in_group : bool , message : AirbyteMessage , json_message : Optional [Dict [str , Any ]]) -> bool :
@@ -281,8 +290,11 @@ def _close_page(
281
290
current_page_records .clear ()
282
291
283
292
def _read_stream (
284
- self , source : DeclarativeSource , config : Mapping [str , Any ], configured_catalog : ConfiguredAirbyteCatalog ,
285
- state : List [AirbyteStateMessage ]
293
+ self ,
294
+ source : DeclarativeSource ,
295
+ config : Mapping [str , Any ],
296
+ configured_catalog : ConfiguredAirbyteCatalog ,
297
+ state : List [AirbyteStateMessage ],
286
298
) -> Iterator [AirbyteMessage ]:
287
299
# the generator can raise an exception
288
300
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
0 commit comments