Skip to content

Commit 643c07d

Browse files
clnolljatinyadav-cc
authored andcommitted
File-based CDK: enqueue AirbyteMessage of type record instead of sending to the message repository (airbytehq#35318)
1 parent a7211a7 commit 643c07d

File tree

1 file changed

+3
-0
lines changed
  • airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent

1 file changed

+3
-0
lines changed

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

+3
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ def read(self) -> Iterable[Record]:
243243
data_to_return = dict(record_data)
244244
self._stream.transformer.transform(data_to_return, self._stream.get_json_schema())
245245
yield Record(data_to_return, self.stream_name())
246+
elif isinstance(record_data, AirbyteMessage) and record_data.type == Type.RECORD:
247+
# `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
248+
yield Record(record_data.record.data, self.stream_name())
246249
else:
247250
self._message_repository.emit_message(record_data)
248251
except Exception as e:

0 commit comments

Comments
 (0)