-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(registry): add remove stale partition job #38165
feat(registry): add remove stale partition job #38165
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Just wondering how you're going to test it.
This change will materialize into a job you can manually trigger from Dagster?
Is there a reason we're not bumping the orchestrator package version?
stale_etags = [etag for etag in all_etag_partitions if etag not in all_fresh_etags] | ||
context.log.info(f"Stale etags found: {stale_etags}") | ||
for stale_etag in stale_etags: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can avoid one iteration if you filter and delete in a single iteration on all_etag_partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah great call. Will change
|
||
all_fresh_etags = [blob.etag for blob in all_metadata_file_blobs] | ||
|
||
all_etag_partitions = context.instance.get_dynamic_partitions(partition_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.intance.get_dynamic_partitions
calls the Dagster backend we went to clean up right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct! That line asks Dagster "Hey what metadata files do you have partitions for?"
@@ -24,6 +24,35 @@ | |||
) | |||
|
|||
|
|||
@op(required_resource_keys={"all_metadata_file_blobs"}) | |||
def remove_stale_metadata_partitions_op(context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to not perform this clean up at partition insertion time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I wanted to introduce this logic in a separate area right now simply because Im scared of it since its destructive.
If we accidentally delete ALL partitions, its not the end of the world, it just means we may miss a failed metadata file and have to reingest all existing metadata files to know we're ok.
Also Dagster doesn't let you bulk delete partition keys, and Im worried about the time it would take to interate over 10000 keys to delete. May lock up our sensor if deployed inside the add_partition sensor today.
So I wanted to keep it separate for now.
If things are looking good. I want to look at merging it back in.
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 👍 - step by step :)
Ive added unit tests, and tested it on my local dagster. The next step is to trigger the job on production 😬
exactly!
Oh that was a miss! Ill update |
2a8de52
to
92c0f72
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After clearing up which ones are getting cleaned up this makes sense 👍🏻 interested in how many we are left with, I feel like it should be about half
""" | ||
This op is responsible for polling for new metadata files and adding their etag to the dynamic partition. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be updated!
all_metadata_file_blobs = context.resources.all_metadata_file_blobs | ||
partition_name = registry_entry.metadata_partitions_def.name | ||
|
||
all_fresh_etags = [blob.etag for blob in all_metadata_file_blobs] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would set subtraction be more appropriate here?
c433512
to
ca2fbea
Compare
92c0f72
to
c99de1f
Compare
ca2fbea
to
bae39f4
Compare
c99de1f
to
3ce2aeb
Compare
3ce2aeb
to
33c257e
Compare
What
Add a job that lets us remove partition keys that no longer exist
Why
We have > 10,000 partitions, one for every metadata file ever. Likely only 500 of those reference files that exist.
Adding this job should let us clean out the noise.
Future
If it works I'll add it to a nightly job