45
45
46
46
47
47
DEFAULT_BATCH_SIZE = 10_000
48
+ DEBUG_MODE = False # Set to True to enable additional debug logging.
48
49
49
50
50
51
class BatchHandle :
@@ -60,6 +61,7 @@ class RecordProcessor(abc.ABC):
60
61
61
62
config_class : type [CacheConfigBase ]
62
63
skip_finalize_step : bool = False
64
+ _expected_streams : set [str ]
63
65
64
66
def __init__ (
65
67
self ,
@@ -109,6 +111,7 @@ def register_source(
109
111
incoming_source_catalog = incoming_source_catalog ,
110
112
incoming_stream_names = stream_names ,
111
113
)
114
+ self ._expected_streams = stream_names
112
115
113
116
@property
114
117
def _streams_with_data (self ) -> set [str ]:
@@ -203,12 +206,18 @@ def process_airbyte_messages(
203
206
# Type.LOG, Type.TRACE, Type.CONTROL, etc.
204
207
pass
205
208
209
+ # Add empty streams to the dictionary, so we create a destination table for it
210
+ for stream_name in self ._expected_streams :
211
+ if stream_name not in stream_batches :
212
+ if DEBUG_MODE :
213
+ print (f"Stream { stream_name } has no data" )
214
+ stream_batches [stream_name ] = []
215
+
206
216
# We are at the end of the stream. Process whatever else is queued.
207
217
for stream_name , stream_batch in stream_batches .items ():
208
- if stream_batch :
209
- record_batch = pa .Table .from_pylist (stream_batch )
210
- self ._process_batch (stream_name , record_batch )
211
- progress .log_batch_written (stream_name , len (stream_batch ))
218
+ record_batch = pa .Table .from_pylist (stream_batch )
219
+ self ._process_batch (stream_name , record_batch )
220
+ progress .log_batch_written (stream_name , len (stream_batch ))
212
221
213
222
# Finalize any pending batches
214
223
for stream_name in list (self ._pending_batches .keys ()):
0 commit comments