Skip to content

Commit 033decc

Browse files
authored
add backward compatibility for an old close slice logic (#36774)
1 parent b27757f commit 033decc

File tree

5 files changed

+13
-7
lines changed

5 files changed

+13
-7
lines changed

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/cursor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
from abc import ABC, abstractmethod
6+
from typing import Any
67

78
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
89
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
@@ -34,7 +35,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
3435
pass
3536

3637
@abstractmethod
37-
def close_slice(self, stream_slice: StreamSlice) -> None:
38+
def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
3839
"""
3940
Update state based on the stream slice. Note that `stream_slice.cursor_slice` and `most_recent_record.associated_slice` are expected
4041
to be the same but we make it explicit here that `stream_slice` should be leveraged to update the state. We do not pass in the

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
138138
):
139139
self._highest_observed_cursor_field_value = record_cursor_value
140140

141-
def close_slice(self, stream_slice: StreamSlice) -> None:
141+
def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
142142
if stream_slice.partition:
143143
raise ValueError(f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}.")
144144
cursor_value_str_by_cursor_value_datetime = dict(

airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
9191
StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record
9292
)
9393

94-
def close_slice(self, stream_slice: StreamSlice) -> None:
94+
def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
9595
try:
9696
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice(
97-
StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice)
97+
StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args
9898
)
9999
except KeyError as exception:
100100
raise ValueError(

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ def read_records(
314314
# Fixing paginator types has a long tail of dependencies
315315
self._paginator.reset()
316316

317+
most_recent_record_from_slice = None
317318
record_generator = partial(
318319
self._parse_records,
319320
stream_state=self.state or {},
@@ -325,10 +326,14 @@ def read_records(
325326
if self.cursor and current_record:
326327
self.cursor.observe(_slice, current_record)
327328

329+
# Latest record read, not necessarily within slice boundaries.
330+
# TODO Remove once all custom components implement `observe` method.
331+
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
332+
most_recent_record_from_slice = self._get_most_recent_record(most_recent_record_from_slice, current_record, _slice)
328333
yield stream_data
329334

330335
if self.cursor:
331-
self.cursor.close_slice(_slice)
336+
self.cursor.close_slice(_slice, most_recent_record_from_slice)
332337
return
333338

334339
def _get_most_recent_record(

airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ def retriever_read_pages(_, __, ___):
445445
side_effect=retriever_read_pages,
446446
):
447447
list(retriever.read_records(stream_slice=stream_slice, records_schema={}))
448-
cursor.close_slice.assert_called_once_with(stream_slice)
448+
cursor.close_slice.assert_called_once_with(stream_slice, first_record if first_greater_than_second else second_record)
449449

450450

451451
def test_given_stream_data_is_not_record_when_read_records_then_update_slice_with_optional_record():
@@ -478,7 +478,7 @@ def retriever_read_pages(_, __, ___):
478478
):
479479
list(retriever.read_records(stream_slice=stream_slice, records_schema={}))
480480
cursor.observe.assert_not_called()
481-
cursor.close_slice.assert_called_once_with(stream_slice)
481+
cursor.close_slice.assert_called_once_with(stream_slice, None)
482482

483483

484484
def _generate_slices(number_of_slices):

0 commit comments

Comments
 (0)