Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code #45413

Merged
merged 4 commits into from
Sep 17, 2024

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Sep 12, 2024

Closes https://github.com/airbytehq/airbyte-internal-issues/issues/9579

What

To add support for low-code streams to run concurrently, we need to add a couple of features to the ConcurrentCursor so that it can replace the existing DatetimeBasedCursor for partition generation and state management. The intent is that for low-code incremental (non-substreams) will start using the concurrent DefaultStream and instantiate a ConcurrentCursor. And DeclarativeStream will no longer use DatetimeBasedCursor.

How

There are a few main aspects to the changes in this PR:

  • Adding support for cursor granularity to ConcurrentCursor. This was a low-code feature to reduce duplicates when windows overlap. We want to continue to support this because some API overlaps at the day or higher granularity can lead to too many duplicates
  • Decoupling request_option_provider from the cursor/stream_slicer interface. The reason for this is that we no longer want DeclarativeStreams (aside from edge cases) to manage their own state. We do however need to retain request parameter injection. A new DatetimeBasedRequestOptionProvider allows state management and request injection to be separated
  • Updates the model_to_component_factory.py to separate cursor and request_option_provider for only DatetimeBasedCursor

I also want to highlight the tests in test_cursor.py. While this isn't testing new cursor features specifically, I wanted to de-risk the last part of the project where we combine everything. In that work, we're going to have the new ConcurrentDeclarativeStream parse YAML into a low-code stream and using it to instantiate DefaultStream + ConcurrentCursor. These tests are meant to validate that we are able to turn low-code into concurrent components and concurrent has feature parity.

Review guide

  1. cursor.py
  2. test_cursor.py
  3. datetime_based_request_options_provider.py
  4. simple_retriever.py
  5. model_to_component_factory.py

User Impact

Should be a no-op

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

…features to concurrent_cursor to support moving low-code datetime_based_cursor
Copy link

vercel bot commented Sep 12, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Sep 17, 2024 0:07am

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Sep 12, 2024
@@ -58,6 +60,23 @@ def extract_value(self, record: Record) -> CursorValueType:
return cursor_value # type: ignore # we assume that the value the path points at is a comparable


class InterpolatedCursorField(CursorField):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this new InterpolatedCursorField so that we can perform interpolation evaluation at processing time when we observe records, instead of at parse time when we build the ConcurrentCursor.

I feel like this is a nice to have from the mental model of interpolation only being used at runtime, but the unfortunate side effect is that we leak interpolation as a concept into the Concurrent CDK. I've included the code to spark discussion, but my vote is that we remove this and just perform the interpolation ahead of time when we instantiate the ConcurrentCursor

@@ -143,6 +162,7 @@ def __init__(
end_provider: Callable[[], CursorValueType],
lookback_window: Optional[GapType] = None,
slice_range: Optional[GapType] = None,
cursor_granularity: Optional[GapType] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cursor_granularity is a part of low-code cursors, but not concurrent. The main lever this turns is the reduction of record duplicates from date window edges the overlap.

Our overall stance has been that some duplicates over the course of a sync are tolerable, but we should never lose records. After auditing our existing repository connectors, we have 14 cases where we use "P1D" or greater as granularity. This I would consider to be a potentially unacceptable number of duplicates and this is on some of our more highly used connectors hence why I've added this in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for checking how often the feature is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change of supporting granularity here.

For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format YYYY-MM as an input, I don't see how we can have something different than P1M as a cursor granularity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the largest sizes we do start to lose some options. Although I don't think the month or year option is that common or if I've ever seen it. Bad API design in my opinion. I think there's an interesting question of inference in that it's probably 99% chance a user using a date time of hours would use hour granularity. But that might be more of low-code interface as opposed to the underlying cursor. Which we would still need cursor_granularity configurable

),
]
)
def test_generate_slices_concurrent_cursor_from_datetime_based_cursor(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this set of specialized tests that starts with a DatetimeBasedCursor runtime object and transforms the fields into the ones needed by the ConcurrentCursor. They're meant to simulate what we'll need to when we start doing the follow up work to create the ConcurrentDeclarativeSource which will instantiate concurrent DefaultStream from low-code streams.

It's meant to derisk that part of the project by testing that the ConcurrentCursor is expressive enough and has the required features that match the existing low-code component.

@brianjlai brianjlai marked this pull request as ready for review September 12, 2024 18:04
Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

@@ -143,6 +162,7 @@ def __init__(
end_provider: Callable[[], CursorValueType],
lookback_window: Optional[GapType] = None,
slice_range: Optional[GapType] = None,
cursor_granularity: Optional[GapType] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for checking how often the feature is used

from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider

__all__ = ["InterpolatedRequestOptionsProvider", "RequestOptionsProvider"]
__all__ = ["DatetimeBasedRequestOptionsProvider", "DefaultRequestOptionsProvider", "InterpolatedRequestOptionsProvider", "RequestOptionsProvider"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add DefaultRequestOptionsProvider to the CDK's top level init.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep will add!

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very happy with decoupling request_option_provider from the cursors in low-code🎉

I've left a couple of comments but nothing blocking so I'll approve

@@ -231,21 +234,21 @@ def _parse_response(
) -> Iterable[Record]:
if not response:
self._last_response = None
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@@ -143,6 +162,7 @@ def __init__(
end_provider: Callable[[], CursorValueType],
lookback_window: Optional[GapType] = None,
slice_range: Optional[GapType] = None,
cursor_granularity: Optional[GapType] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change of supporting granularity here.

For the topic of duplication and big granularities, isn't it driven by the API granularity? For example, if an API only takes format YYYY-MM as an input, I don't see how we can have something different than P1M as a cursor granularity

@@ -159,6 +161,7 @@ def __init__(
self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
self._lookback_window = lookback_window
self._slice_range = slice_range
self._cursor_granularity = cursor_granularity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We might have (a very small piece of) logic that is duplicated. My understanding is that this value is basically defined as part of the implementation of AbstractStreamStateConverter.increment. Is this fair? If so, would there be a way to not duplicate this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of DateTimeStreamStateConverter.increment() actually might not be duplicated, but an area that I had not originally accounted for. So in that regard you found a bug I need to fix! The two usages of the method:

  • _compare_interval(): which is interestingly completely unused in our code
  • merge_intervals(): I need to fix the increment method so the DateTimeStreamStateConverter takes in an optional parameter cursor_granularity. And we should use that in place of timedelta(milliseconds=1). Otherwise we won't properly merge partitions together since they won't line up correctly.

I'll adjust the code accordingly and add a test case for this!

@brianjlai
Copy link
Contributor Author

brianjlai commented Sep 17, 2024

/approve-regression-tests after inspecting results the only change was the catalog having more streams that are is_reumable = true. Running the latest CDK vs jira on 4.5.0, RFR for substreams was released in 4.5.1 which explains the deviation

Check job output.

✅ Approving regression tests

@brianjlai brianjlai merged commit 199a807 into master Sep 17, 2024
35 of 36 checks passed
@brianjlai brianjlai deleted the brian/unify_low_code_and_concurrent_cursors branch September 17, 2024 18:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants