Skip to content

Commit 1aaf8f9

Browse files
committed
s3-no-downtime-full-refresh
1 parent 475d5c2 commit 1aaf8f9

File tree

5 files changed

+167
-16
lines changed

5 files changed

+167
-16
lines changed

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BlobStorageOperations.kt

+16
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,22 @@ abstract class BlobStorageOperations protected constructor() {
5353
pathFormat: String
5454
)
5555

56+
/** Clean up all the objects matching the provided [keysToDelete] */
57+
abstract fun cleanUpObjects(keysToDelete: List<String>)
58+
59+
/**
60+
* List all the existing bucket objects for a given [namespace], [streamName], [objectPath]
61+
* which matches the [pathFormat] regex.
62+
*
63+
* @return List of keys of the objects
64+
*/
65+
abstract fun listExistingObjects(
66+
namespace: String?,
67+
streamName: String,
68+
objectPath: String,
69+
pathFormat: String
70+
): List<String>
71+
5672
abstract fun dropBucketObject(objectPath: String)
5773

5874
abstract fun isValidData(jsonNode: JsonNode): Boolean

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt

+53-8
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,20 @@ class S3ConsumerFactory {
6969
val pathFormat = writeConfig.pathFormat
7070
if (mustCleanUpExistingObjects(writeConfig, storageOperations)) {
7171
LOGGER.info {
72-
"Clearing storage area in destination started for namespace $namespace " +
72+
"Listing objects to cleanup for namespace $namespace " +
7373
"stream $stream bucketObject $outputBucketPath pathFormat $pathFormat"
7474
}
75-
storageOperations.cleanUpBucketObject(
76-
namespace,
77-
stream,
78-
outputBucketPath,
79-
pathFormat
75+
writeConfig.objectsFromOldGeneration.addAll(
76+
storageOperations.listExistingObjects(
77+
namespace,
78+
stream,
79+
outputBucketPath,
80+
pathFormat,
81+
),
8082
)
8183
LOGGER.info {
82-
"Clearing storage area in destination completed for namespace $namespace stream $stream bucketObject $outputBucketPath"
84+
"Marked ${writeConfig.objectsFromOldGeneration.size} keys for deletion at end of sync " +
85+
"for namespace $namespace stream $stream bucketObject $outputBucketPath"
8386
}
8487
} else {
8588
LOGGER.info {
@@ -138,8 +141,17 @@ class S3ConsumerFactory {
138141
storageOperations: BlobStorageOperations,
139142
writeConfigs: List<WriteConfig>
140143
): OnCloseFunction {
141-
return OnCloseFunction { hasFailed: Boolean, _: Map<StreamDescriptor, StreamSyncSummary> ->
144+
145+
val streamDescriptorToWriteConfig =
146+
writeConfigs.associateBy {
147+
StreamDescriptor().withNamespace(it.namespace).withName(it.streamName)
148+
}
149+
return OnCloseFunction {
150+
hasFailed: Boolean,
151+
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary> ->
142152
if (hasFailed) {
153+
// TODO: From the code looks like hasFailed is set to true after onClose is called
154+
// Does this ever get invoked ?
143155
LOGGER.info { "Cleaning up destination started for ${writeConfigs.size} streams" }
144156
for (writeConfig in writeConfigs) {
145157
storageOperations.cleanUpBucketObject(
@@ -150,6 +162,39 @@ class S3ConsumerFactory {
150162
}
151163
LOGGER.info { "Cleaning up destination completed." }
152164
}
165+
166+
// On stream success clean up the objects marked for deletion per stream. This is done
167+
// on per-stream basis
168+
streamSyncSummaries.forEach { entry: Map.Entry<StreamDescriptor, StreamSyncSummary> ->
169+
val streamSuccessful =
170+
entry.value.terminalStatus ==
171+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
172+
if (streamSuccessful) {
173+
val writeConfig = streamDescriptorToWriteConfig[entry.key]
174+
writeConfig?.let {
175+
if (it.objectsFromOldGeneration.isNotEmpty()) {
176+
// Although S3API is safe to send empty list of keys, just avoiding
177+
// unnecessary S3 call
178+
// Logic to determine what to delete is in onStart so not doing any
179+
// redundant checks for
180+
// destinationSyncMode.
181+
LOGGER.info {
182+
"Found ${it.objectsFromOldGeneration.size} marked for deletion in namespace: ${entry.key.namespace},stream: ${entry.key.name} " +
183+
"Proceeding with cleaning up the objects"
184+
}
185+
storageOperations.cleanUpObjects(it.objectsFromOldGeneration)
186+
LOGGER.info {
187+
"Cleaning up completed for namespace: ${entry.key.namespace},stream: ${entry.key.name}"
188+
}
189+
}
190+
}
191+
} else {
192+
LOGGER.info {
193+
"Stream not successful with status ${entry.value.terminalStatus} for namespace: ${entry.key.namespace}, name: ${entry.key.name} " +
194+
"Skipping deletion of any old objects marked for deletion."
195+
}
196+
}
197+
}
153198
}
154199
}
155200

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt

+36
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,42 @@ open class S3StorageOperations(
459459
}
460460
}
461461

462+
override fun listExistingObjects(
463+
namespace: String?,
464+
streamName: String,
465+
objectPath: String,
466+
pathFormat: String
467+
): List<String> {
468+
val regexFormat: Pattern =
469+
Pattern.compile(getRegexFormat(namespace, streamName, pathFormat))
470+
var keys = listOf<String>()
471+
var objects: ObjectListing = listObjects(objectPath)
472+
while (objects.objectSummaries.size > 0) {
473+
keys =
474+
keys +
475+
objects.objectSummaries
476+
.filter { obj: S3ObjectSummary ->
477+
regexFormat
478+
.matcher(
479+
obj.key,
480+
)
481+
.matches()
482+
}
483+
.map { obj: S3ObjectSummary -> obj.key }
484+
if (objects.isTruncated) {
485+
objects = s3Client.listNextBatchOfObjects(objects)
486+
} else {
487+
break
488+
}
489+
}
490+
return keys
491+
}
492+
493+
override fun cleanUpObjects(keysToDelete: List<String>) {
494+
val bucket: String? = s3Config.bucketName
495+
cleanUpObjects(bucket, keysToDelete.map { DeleteObjectsRequest.KeyVersion(it) })
496+
}
497+
462498
protected open fun cleanUpObjects(
463499
bucket: String?,
464500
keysToDelete: List<DeleteObjectsRequest.KeyVersion>

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/WriteConfig.kt

+2-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ constructor(
1818
val generationId: Long?,
1919
val minimumGenerationId: Long?,
2020
val storedFiles: MutableList<String> = arrayListOf(),
21+
val objectsFromOldGeneration: MutableList<String> = arrayListOf()
2122
) {
2223

2324
fun addStoredFile(file: String) {
@@ -29,13 +30,6 @@ constructor(
2930
}
3031

3132
override fun toString(): String {
32-
return "WriteConfig{" +
33-
"streamName=$streamName" +
34-
", namespace=$namespace" +
35-
", outputBucketPath=$outputBucketPath" +
36-
", pathFormat=$pathFormat" +
37-
", fullOutputPath=$fullOutputPath" +
38-
", syncMode=$syncMode" +
39-
'}'
33+
return "WriteConfig(namespace=$namespace, streamName='$streamName', outputBucketPath='$outputBucketPath', pathFormat='$pathFormat', fullOutputPath='$fullOutputPath', syncMode=$syncMode, generationId=$generationId, minimumGenerationId=$minimumGenerationId)"
4034
}
4135
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt

+60
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,66 @@ protected constructor(
451451
)
452452
}
453453

454+
/** Test runs 2 failed syncs and verifies the previous sync objects are not cleaned up. */
455+
@Test
456+
fun testOverwriteSyncMultipleFailedGenerationsFilesPreserved() {
457+
assumeTrue(
458+
implementsOverwrite(),
459+
"Destination's spec.json does not support overwrite sync mode."
460+
)
461+
val config = getConfig()
462+
463+
// Run first failed attempt of same generation
464+
val catalogPair =
465+
getTestCatalog(SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE, 42, 12, 12)
466+
val firstSyncMessages: List<AirbyteMessage> =
467+
getFirstSyncMessagesFixture1(
468+
catalogPair.first,
469+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE
470+
)
471+
try {
472+
runSyncAndVerifyStateOutput(config, firstSyncMessages, catalogPair.first, false)
473+
fail { "Should not succeed the sync when Trace message is INCOMPLETE" }
474+
} catch (_: TestHarnessException) {}
475+
476+
// Run second failed attempt of same generation
477+
val catalogPair2 =
478+
getTestCatalog(SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE, 43, 12, 12)
479+
val secondSyncMessages =
480+
getFirstSyncMessagesFixture1(
481+
catalogPair2.first,
482+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE
483+
)
484+
485+
try {
486+
runSyncAndVerifyStateOutput(config, secondSyncMessages, catalogPair2.first, false)
487+
fail { "Should not succeed the sync when Trace message is INCOMPLETE" }
488+
} catch (_: TestHarnessException) {}
489+
490+
// Verify our delayed delete logic creates no data downtime.
491+
val defaultSchema = getDefaultSchema(config)
492+
retrieveRawRecordsAndAssertSameMessages(
493+
catalogPair.second,
494+
firstSyncMessages + secondSyncMessages,
495+
defaultSchema
496+
)
497+
498+
// This doesn't happen in real world but just verifying if the generationId is incremented,
499+
// we disregard old data
500+
// Run a successful sync with incremented generationId, This should nuke all old generation
501+
// files which were preserved.
502+
val catalogPair3 =
503+
getTestCatalog(SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE, 43, 13, 13)
504+
val thirdSyncMessages = getSyncMessagesFixture2()
505+
runSyncAndVerifyStateOutput(config, thirdSyncMessages, catalogPair3.first, false)
506+
507+
retrieveRawRecordsAndAssertSameMessages(
508+
catalogPair.second,
509+
thirdSyncMessages,
510+
defaultSchema
511+
)
512+
}
513+
454514
/**
455515
* Test runs 2 successful OVERWRITE syncs but with same generation and a sync to another catalog
456516
* with no generationId, this shouldn't happen from platform but acts as a simulation for

0 commit comments

Comments
 (0)