-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code #45413
[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code #45413
Conversation
…features to concurrent_cursor to support moving low-code datetime_based_cursor
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
@@ -58,6 +60,23 @@ def extract_value(self, record: Record) -> CursorValueType: | |||
return cursor_value # type: ignore # we assume that the value the path points at is a comparable | |||
|
|||
|
|||
class InterpolatedCursorField(CursorField): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this new InterpolatedCursorField
so that we can perform interpolation evaluation at processing time when we observe records, instead of at parse time when we build the ConcurrentCursor
.
I feel like this is a nice to have from the mental model of interpolation only being used at runtime, but the unfortunate side effect is that we leak interpolation as a concept into the Concurrent CDK. I've included the code to spark discussion, but my vote is that we remove this and just perform the interpolation ahead of time when we instantiate the ConcurrentCursor
@@ -143,6 +162,7 @@ def __init__( | |||
end_provider: Callable[[], CursorValueType], | |||
lookback_window: Optional[GapType] = None, | |||
slice_range: Optional[GapType] = None, | |||
cursor_granularity: Optional[GapType] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor_granularity
is a part of low-code cursors, but not concurrent. The main lever this turns is the reduction of record duplicates from date window edges the overlap.
Our overall stance has been that some duplicates over the course of a sync are tolerable, but we should never lose records. After auditing our existing repository connectors, we have 14 cases where we use "P1D" or greater as granularity. This I would consider to be a potentially unacceptable number of duplicates and this is on some of our more highly used connectors hence why I've added this in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for checking how often the feature is used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the change of supporting granularity here.
For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format YYYY-MM
as an input, I don't see how we can have something different than P1M
as a cursor granularity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the largest sizes we do start to lose some options. Although I don't think the month or year option is that common or if I've ever seen it. Bad API design in my opinion. I think there's an interesting question of inference in that it's probably 99% chance a user using a date time of hours would use hour granularity. But that might be more of low-code interface as opposed to the underlying cursor. Which we would still need cursor_granularity
configurable
), | ||
] | ||
) | ||
def test_generate_slices_concurrent_cursor_from_datetime_based_cursor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this set of specialized tests that starts with a DatetimeBasedCursor
runtime object and transforms the fields into the ones needed by the ConcurrentCursor
. They're meant to simulate what we'll need to when we start doing the follow up work to create the ConcurrentDeclarativeSource
which will instantiate concurrent DefaultStream
from low-code streams.
It's meant to derisk that part of the project by testing that the ConcurrentCursor
is expressive enough and has the required features that match the existing low-code component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
@@ -143,6 +162,7 @@ def __init__( | |||
end_provider: Callable[[], CursorValueType], | |||
lookback_window: Optional[GapType] = None, | |||
slice_range: Optional[GapType] = None, | |||
cursor_granularity: Optional[GapType] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for checking how often the feature is used
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( | ||
InterpolatedRequestOptionsProvider, | ||
) | ||
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider | ||
|
||
__all__ = ["InterpolatedRequestOptionsProvider", "RequestOptionsProvider"] | ||
__all__ = ["DatetimeBasedRequestOptionsProvider", "DefaultRequestOptionsProvider", "InterpolatedRequestOptionsProvider", "RequestOptionsProvider"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we add DefaultRequestOptionsProvider
to the CDK's top level init.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep will add!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very happy with decoupling request_option_provider
from the cursors in low-code🎉
I've left a couple of comments but nothing blocking so I'll approve
@@ -231,21 +234,21 @@ def _parse_response( | |||
) -> Iterable[Record]: | |||
if not response: | |||
self._last_response = None | |||
return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
@@ -143,6 +162,7 @@ def __init__( | |||
end_provider: Callable[[], CursorValueType], | |||
lookback_window: Optional[GapType] = None, | |||
slice_range: Optional[GapType] = None, | |||
cursor_granularity: Optional[GapType] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the change of supporting granularity here.
For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format YYYY-MM
as an input, I don't see how we can have something different than P1M
as a cursor granularity
@@ -159,6 +161,7 @@ def __init__( | |||
self.start, self._concurrent_state = self._get_concurrent_state(stream_state) | |||
self._lookback_window = lookback_window | |||
self._slice_range = slice_range | |||
self._cursor_granularity = cursor_granularity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We might have (a very small piece of) logic that is duplicated. My understanding is that this value is basically defined as part of the implementation of AbstractStreamStateConverter.increment
. Is this fair? If so, would there be a way to not duplicate this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage of DateTimeStreamStateConverter.increment()
actually might not be duplicated, but an area that I had not originally accounted for. So in that regard you found a bug I need to fix! The two usages of the method:
_compare_interval()
: which is interestingly completely unused in our codemerge_intervals()
: I need to fix the increment method so theDateTimeStreamStateConverter
takes in an optional parametercursor_granularity
. And we should use that in place oftimedelta(milliseconds=1)
. Otherwise we won't properly merge partitions together since they won't line up correctly.
I'll adjust the code accordingly and add a test case for this!
/approve-regression-tests after inspecting results the only change was the catalog having more streams that are is_reumable = true. Running the latest CDK vs jira on 4.5.0, RFR for substreams was released in 4.5.1 which explains the deviation
|
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/9579
What
To add support for low-code streams to run concurrently, we need to add a couple of features to the ConcurrentCursor so that it can replace the existing DatetimeBasedCursor for partition generation and state management. The intent is that for low-code incremental (non-substreams) will start using the concurrent
DefaultStream
and instantiate aConcurrentCursor
. AndDeclarativeStream
will no longer useDatetimeBasedCursor
.How
There are a few main aspects to the changes in this PR:
DatetimeBasedRequestOptionProvider
allows state management and request injection to be separatedmodel_to_component_factory.py
to separate cursor and request_option_provider for onlyDatetimeBasedCursor
I also want to highlight the tests in
test_cursor.py
. While this isn't testing new cursor features specifically, I wanted to de-risk the last part of the project where we combine everything. In that work, we're going to have the newConcurrentDeclarativeStream
parse YAML into a low-code stream and using it to instantiateDefaultStream
+ConcurrentCursor
. These tests are meant to validate that we are able to turn low-code into concurrent components and concurrent has feature parity.Review guide
cursor.py
test_cursor.py
datetime_based_request_options_provider.py
simple_retriever.py
model_to_component_factory.py
User Impact
Should be a no-op
Can this PR be safely reverted and rolled back?