-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination S3: Deferred deletes on sync success in OVERWRITE #42579
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
2c71cb7
to
d66a66b
Compare
005b367
to
f2bdd27
Compare
d66a66b
to
2b602b5
Compare
f2bdd27
to
d1c1c0a
Compare
2b602b5
to
cc558fa
Compare
cdf2021
to
32f91da
Compare
d1c1c0a
to
3b439ea
Compare
32f91da
to
196731e
Compare
14faf69
to
96cbf8f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, had some fyi/questions
...inations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BlobStorageOperations.kt
Show resolved
Hide resolved
...destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt
Outdated
Show resolved
Hide resolved
...destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt
Outdated
Show resolved
Hide resolved
...stinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt
Outdated
Show resolved
Hide resolved
...estFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt
Outdated
Show resolved
Hide resolved
...estFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt
Outdated
Show resolved
Hide resolved
196731e
to
1627fdb
Compare
96cbf8f
to
4ac9531
Compare
e6055c8
to
475d5c2
Compare
4ac9531
to
1aaf8f9
Compare
123df63
to
6629bba
Compare
...stinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt
Outdated
Show resolved
Hide resolved
9b0f98a
to
94bb9fe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one last question, lgtm once resolved
objectPath: String, | ||
maxKeysPerPage: Int = 1000, | ||
pageConsumer: (List<S3ObjectSummary>) -> Unit | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you intentionally keeping the regex matching stuff outside this method? (i.e. the Pattern.compile(getRegexFormat(namespace, streamName, pathFormat))
+ objectSumaries.filter { regex.matcher(it).matches() }
thing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. wasn't sure if all usages are doing filter at the time of refactoring (turns out they seem to be)
51c23f8
to
1844494
Compare
* will be filtered with generationId metadata strictly less than [currentGenerationId] | ||
* @return List of keys of the objects | ||
*/ | ||
abstract fun listExistingObjects( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not requesting a change to this, but I wonder if in the new CDK we should try to keep the BL out of the storage operations as much as possible.
Like this is really an object search with a metadata comparator?
stream, | ||
outputBucketPath, | ||
pathFormat | ||
writeConfig.objectsFromOldGeneration.addAll( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another raised CDK question: what's the best way to handle accumulated stream state. Is appending to the config the right way? Does it make things less manageable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only remnant in S3 land, we avoided attaching to WriteConfig (or rather killed that object all together) and saving states at instance level of that StreamOperation
class. If there is a "state" machine then the FSM executor should accumulate and pass it along right ?
1844494
to
33e3e0d
Compare
94bb9fe
to
67436cc
Compare
33e3e0d
to
1844494
Compare
...stinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt
Outdated
Show resolved
Hide resolved
1844494
to
194365d
Compare
491ad7c
to
a626bae
Compare
733226f
to
8918b13
Compare
152d411
to
d3b7688
Compare
/publish-java-cdk
|
d3b7688
to
1f789b2
Compare
1f789b2
to
adfe484
Compare
What
Deferring the deletes until end of the sync to only delete on receving a
COMPLETE
for stream status.How
cleanUpObjects
, instead of deleting them immediately, collecting all the object keys marked for delete and sending them toonClose
function for actual deletion.Review guide
BlobStorageOperations
interface to support querying and cleaning the objects in a deferred fashion.WriteConfig
. Note that it is not thread-safe butonStart
andonClose
are only called once so nothing else is mutating that list in between.S3ConsumerFactory
for deferring the deletion actionOVERWRITE
modeUser Impact
Can this PR be safely reverted and rolled back?