Skip to content

Commit fbccc9d

Browse files
maxi297jatinyadav-cc
authored andcommitted
Emit state when no partitions are generated for ccdk (airbytehq#34605)
1 parent 8d778a7 commit fbccc9d

File tree

12 files changed

+78
-14
lines changed

12 files changed

+78
-14
lines changed

airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,12 @@ def on_partition_generation_completed(self, sentinel: PartitionGenerationComplet
6666
"""
6767
stream_name = sentinel.stream.name
6868
self._streams_currently_generating_partitions.remove(sentinel.stream.name)
69-
ret = []
7069
# It is possible for the stream to already be done if no partitions were generated
7170
# If the partition generation process was completed and there are no partitions left to process, the stream is done
7271
if self._is_stream_done(stream_name) or len(self._streams_to_running_partitions[stream_name]) == 0:
73-
ret.append(self._on_stream_is_done(stream_name))
72+
yield from self._on_stream_is_done(stream_name)
7473
if self._stream_instances_to_start_partition_generation:
75-
ret.append(self.start_next_partition_generator())
76-
return ret
74+
yield self.start_next_partition_generator()
7775

7876
def on_partition(self, partition: Partition) -> None:
7977
"""
@@ -102,7 +100,7 @@ def on_partition_complete_sentinel(self, sentinel: PartitionCompleteSentinel) ->
102100
partitions_running.remove(partition)
103101
# If all partitions were generated and this was the last one, the stream is done
104102
if partition.stream_name() not in self._streams_currently_generating_partitions and len(partitions_running) == 0:
105-
yield self._on_stream_is_done(partition.stream_name())
103+
yield from self._on_stream_is_done(partition.stream_name())
106104
yield from self._message_repository.consume_queue()
107105

108106
def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
@@ -171,13 +169,15 @@ def is_done(self) -> bool:
171169
def _is_stream_done(self, stream_name: str) -> bool:
172170
return stream_name in self._streams_done
173171

174-
def _on_stream_is_done(self, stream_name: str) -> AirbyteMessage:
172+
def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]:
175173
self._logger.info(f"Read {self._record_counter[stream_name]} records from {stream_name} stream")
176174
self._logger.info(f"Marking stream {stream_name} as STOPPED")
177175
stream = self._stream_name_to_instance[stream_name]
176+
stream.cursor.ensure_at_least_one_state_emitted()
177+
yield from self._message_repository.consume_queue()
178178
self._logger.info(f"Finished syncing {stream.name}")
179179
self._streams_done.add(stream_name)
180-
return stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.COMPLETE)
180+
yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), AirbyteStreamStatus.COMPLETE)
181181

182182
def _stop_streams(self) -> Iterable[AirbyteMessage]:
183183
self._thread_pool_manager.shutdown()

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/abstract_stream.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from airbyte_cdk.models import AirbyteStream
99
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
10+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1011
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1112
from deprecated.classic import deprecated
1213

@@ -81,3 +82,10 @@ def log_stream_sync_configuration(self) -> None:
8182
"""
8283
Logs the stream's configuration for debugging purposes.
8384
"""
85+
86+
@property
87+
@abstractmethod
88+
def cursor(self) -> Cursor:
89+
"""
90+
:return: The cursor associated with this stream.
91+
"""

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def create_from_stream(
8989
primary_key=pk,
9090
cursor_field=cursor_field,
9191
logger=logger,
92+
cursor=cursor,
9293
),
9394
stream,
9495
cursor,

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ def close_partition(self, partition: Partition) -> None:
5656
"""
5757
raise NotImplementedError()
5858

59+
@abstractmethod
60+
def ensure_at_least_one_state_emitted(self) -> None:
61+
"""
62+
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per
63+
stream. Hence, if no partitions are generated, this method needs to be called.
64+
"""
65+
raise NotImplementedError()
66+
5967

6068
class NoopCursor(Cursor):
6169
@property
@@ -68,6 +76,9 @@ def observe(self, record: Record) -> None:
6876
def close_partition(self, partition: Partition) -> None:
6977
pass
7078

79+
def ensure_at_least_one_state_emitted(self) -> None:
80+
pass
81+
7182

7283
class ConcurrentCursor(Cursor):
7384
_START_BOUNDARY = 0
@@ -179,3 +190,10 @@ def _extract_from_slice(self, partition: Partition, key: str) -> Comparable:
179190
return self._connector_state_converter.parse_value(_slice[key]) # type: ignore # we expect the devs to specify a key that would return a Comparable
180191
except KeyError as exception:
181192
raise KeyError(f"Partition is expected to have key `{key}` but could not be found") from exception
193+
194+
def ensure_at_least_one_state_emitted(self) -> None:
195+
"""
196+
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
197+
called.
198+
"""
199+
self._emit_state_message()

airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from airbyte_cdk.models import AirbyteStream, SyncMode
1010
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1111
from airbyte_cdk.sources.streams.concurrent.availability_strategy import AbstractAvailabilityStrategy, StreamAvailability
12+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, NoopCursor
1213
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1314
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
1415

@@ -23,6 +24,7 @@ def __init__(
2324
primary_key: List[str],
2425
cursor_field: Optional[str],
2526
logger: Logger,
27+
cursor: Optional[Cursor],
2628
namespace: Optional[str] = None,
2729
) -> None:
2830
self._stream_partition_generator = partition_generator
@@ -32,6 +34,7 @@ def __init__(
3234
self._primary_key = primary_key
3335
self._cursor_field = cursor_field
3436
self._logger = logger
37+
self._cursor = cursor or NoopCursor()
3538
self._namespace = namespace
3639

3740
def generate_partitions(self) -> Iterable[Partition]:
@@ -77,3 +80,7 @@ def log_stream_sync_configuration(self) -> None:
7780
"cursor_field": self.cursor_field,
7881
},
7982
)
83+
84+
@property
85+
def cursor(self) -> Cursor:
86+
return self._cursor

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/incremental_scenarios.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
7777
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
7878
{"stream1": {"cursor_field": 2}},
79+
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
7980
]
8081
)
8182
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
@@ -152,6 +153,7 @@
152153
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
153154
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
154155
{"stream1": {"cursor_field": 2}},
156+
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
155157
]
156158
)
157159
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
@@ -239,6 +241,7 @@
239241
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
240242
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
241243
{"stream1": {"cursor_field": 2}},
244+
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
242245
]
243246
)
244247
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@
361361
{"data": {"id": "3", "cursor_field": 2}, "stream": "stream1"},
362362
{"data": {"id": "4", "cursor_field": 3}, "stream": "stream1"},
363363
{"stream1": {"cursor_field": 2}},
364+
{"stream1": {"cursor_field": 2}}, # see Cursor.ensure_at_least_one_state_emitted
364365
]
365366
)
366367
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})
@@ -403,6 +404,7 @@
403404
{"data": {"id": "1", "cursor_field": 0}, "stream": "stream1"},
404405
{"data": {"id": "2", "cursor_field": 3}, "stream": "stream1"},
405406
{"stream1": {"cursor_field": 3}},
407+
{"stream1": {"cursor_field": 3}}, # see Cursor.ensure_at_least_one_state_emitted
406408
]
407409
)
408410
.set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"})

airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55

66
from airbyte_cdk.sources.message import InMemoryMessageRepository
7+
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
78
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
89
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
910
from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenarioBuilder
@@ -29,6 +30,7 @@
2930
primary_key=[],
3031
cursor_field=None,
3132
logger=logging.getLogger("test_logger"),
33+
cursor=NoopCursor(),
3234
)
3335

3436
_id_only_stream_with_slice_logger = DefaultStream(
@@ -46,6 +48,7 @@
4648
primary_key=[],
4749
cursor_field=None,
4850
logger=logging.getLogger("test_logger"),
51+
cursor=NoopCursor(),
4952
)
5053

5154
_id_only_stream_with_primary_key = DefaultStream(
@@ -63,6 +66,7 @@
6366
primary_key=["id"],
6467
cursor_field=None,
6568
logger=logging.getLogger("test_logger"),
69+
cursor=NoopCursor(),
6670
)
6771

6872
_id_only_stream_multiple_partitions = DefaultStream(
@@ -83,6 +87,7 @@
8387
primary_key=[],
8488
cursor_field=None,
8589
logger=logging.getLogger("test_logger"),
90+
cursor=NoopCursor(),
8691
)
8792

8893
_id_only_stream_multiple_partitions_concurrency_level_two = DefaultStream(
@@ -103,6 +108,7 @@
103108
primary_key=[],
104109
cursor_field=None,
105110
logger=logging.getLogger("test_logger"),
111+
cursor=NoopCursor(),
106112
)
107113

108114
_stream_raising_exception = DefaultStream(
@@ -120,6 +126,7 @@
120126
primary_key=[],
121127
cursor_field=None,
122128
logger=logging.getLogger("test_logger"),
129+
cursor=NoopCursor(),
123130
)
124131

125132
test_concurrent_cdk_single_stream = (
@@ -246,6 +253,7 @@
246253
primary_key=[],
247254
cursor_field=None,
248255
logger=logging.getLogger("test_logger"),
256+
cursor=NoopCursor(),
249257
),
250258
]
251259
)

airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44
import logging
55
import unittest
6-
from unittest.mock import Mock
6+
from unittest.mock import Mock, call
77

88
import freezegun
99
from airbyte_cdk.models import (
@@ -32,6 +32,7 @@
3232

3333
_STREAM_NAME = "stream"
3434
_ANOTHER_STREAM_NAME = "stream2"
35+
_ANY_AIRBYTE_MESSAGE = Mock(spec=AirbyteMessage)
3536

3637

3738
class TestConcurrentReadProcessor(unittest.TestCase):
@@ -110,6 +111,10 @@ def test_handle_partition_done_no_other_streams_to_generate_partitions_for(self)
110111

111112
@freezegun.freeze_time("2020-01-01T00:00:00")
112113
def test_handle_last_stream_partition_done(self):
114+
in_order_validation_mock = Mock()
115+
in_order_validation_mock.attach_mock(self._another_stream, "_another_stream")
116+
in_order_validation_mock.attach_mock(self._message_repository, '_message_repository')
117+
self._message_repository.consume_queue.return_value = iter([_ANY_AIRBYTE_MESSAGE])
113118
stream_instances_to_read_from = [self._another_stream]
114119

115120
handler = ConcurrentReadProcessor(
@@ -124,9 +129,10 @@ def test_handle_last_stream_partition_done(self):
124129
handler.start_next_partition_generator()
125130

126131
sentinel = PartitionGenerationCompletedSentinel(self._another_stream)
127-
messages = handler.on_partition_generation_completed(sentinel)
132+
messages = list(handler.on_partition_generation_completed(sentinel))
128133

129134
expected_messages = [
135+
_ANY_AIRBYTE_MESSAGE,
130136
AirbyteMessage(
131137
type=MessageType.TRACE,
132138
trace=AirbyteTraceMessage(
@@ -140,6 +146,7 @@ def test_handle_last_stream_partition_done(self):
140146
)
141147
]
142148
assert expected_messages == messages
149+
assert in_order_validation_mock.mock_calls.index(call._another_stream.cursor.ensure_at_least_one_state_emitted) < in_order_validation_mock.mock_calls.index(call._message_repository.consume_queue)
143150

144151
def test_handle_partition(self):
145152
stream_instances_to_read_from = [self._another_stream]
@@ -236,7 +243,7 @@ def test_handle_on_partition_complete_sentinel_yields_status_message_if_the_stre
236243
)
237244
handler.start_next_partition_generator()
238245
handler.on_partition(self._a_closed_partition)
239-
handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._another_stream))
246+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._another_stream)))
240247

241248
sentinel = PartitionCompleteSentinel(self._a_closed_partition)
242249

@@ -543,8 +550,8 @@ def test_on_exception_does_not_stop_streams_that_are_already_done(self):
543550

544551
handler.start_next_partition_generator()
545552
handler.on_partition(self._an_open_partition)
546-
handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream))
547-
handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._another_stream))
553+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._stream)))
554+
list(handler.on_partition_generation_completed(PartitionGenerationCompletedSentinel(self._another_stream)))
548555

549556
another_stream = Mock(spec=AbstractStream)
550557
another_stream.name = _STREAM_NAME

airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_default_stream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from airbyte_cdk.models import AirbyteStream, SyncMode
88
from airbyte_cdk.sources.streams.concurrent.availability_strategy import STREAM_AVAILABLE
9-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
9+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, NoopCursor
1010
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
1111

1212

@@ -28,6 +28,7 @@ def setUp(self):
2828
self._primary_key,
2929
self._cursor_field,
3030
self._logger,
31+
NoopCursor(),
3132
)
3233

3334
def test_get_json_schema(self):
@@ -88,6 +89,7 @@ def test_as_airbyte_stream_with_primary_key(self):
8889
["id"],
8990
self._cursor_field,
9091
self._logger,
92+
NoopCursor(),
9193
)
9294

9395
expected_airbyte_stream = AirbyteStream(
@@ -119,6 +121,7 @@ def test_as_airbyte_stream_with_composite_primary_key(self):
119121
["id_a", "id_b"],
120122
self._cursor_field,
121123
self._logger,
124+
NoopCursor(),
122125
)
123126

124127
expected_airbyte_stream = AirbyteStream(
@@ -150,6 +153,7 @@ def test_as_airbyte_stream_with_a_cursor(self):
150153
self._primary_key,
151154
"date",
152155
self._logger,
156+
NoopCursor(),
153157
)
154158

155159
expected_airbyte_stream = AirbyteStream(
@@ -174,6 +178,7 @@ def test_as_airbyte_stream_with_namespace(self):
174178
self._primary_key,
175179
self._cursor_field,
176180
self._logger,
181+
NoopCursor(),
177182
namespace="test",
178183
)
179184
expected_airbyte_stream = AirbyteStream(

0 commit comments

Comments
 (0)