-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Notion: Migrate to Low-Code #35974
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
airbyte-integrations/connectors/source-notion/source_notion/components.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great so far @ChristoGrab! I appreciate the detailed PR description.
Just a few questions in the code, and a few more here:
The pages and databases incremental logic changed from a custom semi-incremental implementation to the Data Feed low-code implementation. This means the requests to these streams are now in descending rather than ascending order. This should have no effect from a user perspective.
The one effect that I'm wondering about is how we'll pick up where we left off if the sync is interrupted.
Since the DataFeed implementation already does this I think we’re okay, but I’m curious whether you’re aware of a special reason that we were syncing this stream in ascending order previously.
Retry logic no longer reduces page size on repeated requests for low-code streams. My inclination is this is not a big functionality loss, as the blocks stream still retains the original behavior, and it is by far the biggest data stream for this connector (everything in Notion is a block, so the vast majority of records will always come from this stream).
This suggests to me that we should regression test against a connection that requires retries.
Can we verify e.g. from logs, that it’s only the blocks stream that was hitting this?
Removed the call to transform the records read in the parent page stream when fetching slice ids for use in the blocks stream. Transformations are still applied to the records when syncing the low-code pages stream.
Is there any external impact as a result of this?
The production comments stream currently adds the parent page record's cursor field as a page_last_edited_time field to each comment record, which is used as the cursor field for semi-incremental syncs. This means all comments within a parent page's partition share the same cursor value. This has been updated to use per-partition state.
Was this necessary for the migration? Can we still handle the old state?
Is there a breaking changes checklist that I should look at?
@@ -33,6 +33,7 @@ acceptance_tests: | |||
configured_catalog_path: "integration_tests/incremental_catalog.json" | |||
future_state: | |||
future_state_path: "integration_tests/abnormal_state.json" | |||
skip_comprehensive_incremental_tests: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we skipping these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad you asked! This test is a frequent pain point for connectors with limited sandbox data and can throw failures even when nothing is wrong; at first glance I had assumed this was just another such case, but I took a deeper look into the test results and am seeing some fishy logs for the Blocks stream. Investigating further and will get back to you soon as I have a clearer sense of what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cnoll Ok, I think I understand what's going on. The Blocks stream is a doozy, as we not only have no way of filtering or sorting results, but due to the nature of the recursive traversal through the blocks hierarchy, there isn't a defined top-level of records used in slicing.
Instead, an id_stack
is populated with the parent stream ids
; then as we query those pages, the nested blocks have their ids
added to the stack and become the next slice (which will contain child blocks that will become slices for their own children, and down the rabbit hole we go). Right now the stream tracks the cursor_value
of each record and uses a custom StateValueWrapper
to compare and update the max_cursor_time
until there is only one id
remaining in the stack; this condition flips an is_finished
boolean, which then triggers the final calculation of max_cursor
in the StateValueWrapper
to actually update the stream's state.
Here's some print statements of what these values looks like when there is an existing state value:
current stream_state: {'last_edited_time': 2022-10-10T04:00:00.000Z}
current StateValueWrapper: stream=<source_notion.streams.Blocks object at 0x7fc71ff9a140> state_value='2022-10-10T04:00:00.000Z' max_cursor_time='2023-10-10T13:51:00.000Z' <- `max_cursor_time` represents the latest cursor value in the read thus far
most recent record_cursor: 2023-10-23T17:30:00.000Z
updated StateValueWrapper.max_cursor_time: 2023-10-23T17:30:00.000 <- this will keep updating and become the new value of state only if `self.is_finished` has been flipped
So like with the other Notion streams, it's functionally a full read operation that uses filtering on subsequent reads to prevent records from emitting. The issue exposed by the comprehensive_incremental_tests
is that the value of state is only updated at the end of the read, rather than on each call to get_updated_state
. Unfortunately, changing the logic to update the value of state after a slice is processed would be dangerous, since our client-side filtering logic would then start taking effect on the first read and exclude unsorted blocks records that have never been synced.
Ordinarily, the best solution would probably be to set up per-partition semi-incremental logic (like in the Comments stream) so we can update each slice's state as we go and aren't reliant on a first successful sync to set a value for stream_state
. My one hesitation in this case is with the inevitable ballooning of the state object for this stream. For reference, our tiny sandbox account with just a few basic sample pages has over 500 blocks records, any number of which could be a potential parent/slice for other blocks. I'm not certain what the memory limitations are for state objects, but I'm concerned about the potential scaling implications on larger Notion accounts, and unfortunately no matter what we do we can't change the fact that there's no server-side filtering we can use to limit the number of requests per sync.
Any thoughts on how to handle this? I'm tempted to say "working as intended!" for now since this is how the connector has always worked, but I'm admittedly biased by a desire to have this ready to ship by Monday 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the mention should go to @clnoll
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops haha yes, so sorry!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, thank you for the detailed explanation! This is super helpful.
The issue exposed by the comprehensive_incremental_tests is that the value of state is only updated at the end of the read, rather than on each call to get_updated_state.
It sounds like the main side effect is that we may end up syncing some records more than necessary. Is that right?
In any case I agree with you about the best solution, but also that since this is how it's always worked, we can leave it as-is for now. Would you mind creating an issue for this, in case we decide we want to follow up on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that sounds right: if we get halfway through a first sync and there's an interruption, any records that were successfully synced will still be emitted on the subsequent attempt. Put the issue together here: https://github.com/airbytehq/airbyte/issues/36698, just a copy/paste of the investigation to start but I'll do some grooming 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChristoGrab, is it possible to delete the Blocks
stream from the catalog used during incremental tests, or will you enable the tests when they are fixed? #36814
airbyte-integrations/connectors/source-notion/source_notion/manifest.yaml
Show resolved
Hide resolved
airbyte-integrations/connectors/source-notion/integration_tests/abnormal_state.json
Show resolved
Hide resolved
@clnoll Thanks for the thorough review! I'll address the main block of comments one at a time here:
The previous behavior was sort of hacky, since the endpoint is very limited when it comes to sorting/filtering, and only allows us to specify the order and filter by type: page/database. So "incremental" is a bit of a cheeky misnomer as every read is essentially a full refresh operation that simply uses client-side filtering to determine which records to emit. I checked the original PR to see if there was any indication of why the order was set to ascending, but didn't see anything pertinent. My naive guess is the developer picked it more as a seemingly sensible default value than a deliberate design choice, based on the fact that the existing logic would always request all records regardless of the query order and state value. Sorry I don't have a more certain answer, but on the plus side this means the migration does actually have a positive impact for these streams which I failed to note, in that we now have an actual stop condition for requesting data once the appropriate cursor value has been reached! |
Unfortunately my limited access means I can't view customer connections/logs to verify this (Maxime is looking into getting my access upgraded), but I can at least point to the original oncall issue that led to this functionality being added. For some context, the issue was related to frequent 504 errors plaguing an unusually large Notion workspace (~10k pages) that was exacerbated by unspecified changes going on in the Notion API at the time. Sorry, I know that's not much to go on, if this is a point of concern I can certainly take a stab at implementing the logic as a CustomBackoffStrategy for the low-code streams as well! |
This change should provide a minor boost to performance when syncing the Blocks stream, since we're no longer applying transformations during the parent read_records call and can therefore process those records a tad more efficiently. Aside from that there's no impact; the only value we need from this parent call is the |
In theory, it would be possible to set this up to resume the previous behavior by adding a
In the current implementation, we will emit all comments in page 2 and ignore all comments in page 1, because the |
Thanks very much for the clarifications, @ChristoGrab! All makes sense to me. Let me know the results of your investigation into the |
airbyte-integrations/connectors/source-notion/source_notion/components.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-notion/source_notion/components.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
@ChristoGrab Hi, have you tested these changes using the regression tool? |
c64082b
to
6992880
Compare
Approved. Issue for refactoring improvements: https://github.com/airbytehq/airbyte-internal-issues/issues/7311 |
What
This PR aims to migrate source Notion from the Python CDK to the low-code declarative framework. The proposed implementation is a hybrid connector, using the low-code manifest for all streams except the
blocks
stream, which has a fair amount of custom behavior, including a recursive fetching algorithm that I struggled to figure out how to implement as a custom component. Issue with more context.Low-code migration reference
Changes
Non-Breaking
/users/me
endpoint which contains the current user's data) to the standard manifest implementation (check stream:pages
). Some custom error handling has been lost in the process.start_date
were removed. The format of thestart_date
is already strictly enforced in the UI, and it is an optional field with a default value, so this felt like an acceptable loss.blocks
stream still retains the original behavior, and it is by far the biggest data stream for this connector (everything in Notion is a block, so the vast majority of records will always come from this stream).page
stream when fetching slice ids for use in theblocks
stream. Transformations are still applied to the records when syncing the low-codepages
stream.🚨🚨 Breaking
comments
stream currently adds the parentpage
record's cursor field as apage_last_edited_time
field to eachcomment
record, which is used as the cursor field for semi-incremental syncs. This means allcomments
within a parent page's partition share the same cursor value. This has been updated to use per-partition state.Current:
New:
The resulting change is twofold: the format of state has changed to per-partition and the cursor field has been moved from the no-longer appended
page_last_edited_time
to the record's existinglast_edited_time
field.Notes