We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent ec70172 commit 20eedffCopy full SHA for 20eedff
airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
@@ -243,6 +243,9 @@ def read(self) -> Iterable[Record]:
243
data_to_return = dict(record_data)
244
self._stream.transformer.transform(data_to_return, self._stream.get_json_schema())
245
yield Record(data_to_return, self.stream_name())
246
+ elif isinstance(record_data, AirbyteMessage) and record_data.type == Type.RECORD:
247
+ # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
248
+ yield Record(record_data.record.data, self.stream_name())
249
else:
250
self._message_repository.emit_message(record_data)
251
except Exception as e:
0 commit comments