@@ -69,19 +69,20 @@ class S3ConsumerFactory {
69
69
val stream = writeConfig.streamName
70
70
val outputBucketPath = writeConfig.outputBucketPath
71
71
val pathFormat = writeConfig.pathFormat
72
- if (mustCleanUpExistingObjects (writeConfig, storageOperations )) {
72
+ if (! isAppendSync (writeConfig)) {
73
73
LOGGER .info {
74
- " Clearing storage area in destination started for namespace $namespace " +
74
+ " Listing objects to cleanup for namespace $namespace " +
75
75
" stream $stream bucketObject $outputBucketPath pathFormat $pathFormat "
76
76
}
77
- storageOperations.cleanUpBucketObject (
78
- namespace,
79
- stream ,
80
- outputBucketPath ,
81
- pathFormat
77
+ writeConfig.objectsFromOldGeneration.addAll (
78
+ keysForOverwriteDeletion(
79
+ writeConfig ,
80
+ storageOperations ,
81
+ ),
82
82
)
83
83
LOGGER .info {
84
- " Clearing storage area in destination completed for namespace $namespace stream $stream bucketObject $outputBucketPath "
84
+ " Marked ${writeConfig.objectsFromOldGeneration.size} keys for deletion at end of sync " +
85
+ " for namespace $namespace stream $stream bucketObject $outputBucketPath "
85
86
}
86
87
} else {
87
88
LOGGER .info {
@@ -140,17 +141,43 @@ class S3ConsumerFactory {
140
141
storageOperations : BlobStorageOperations ,
141
142
writeConfigs : List <WriteConfig >
142
143
): OnCloseFunction {
143
- return OnCloseFunction { hasFailed: Boolean , _: Map <StreamDescriptor , StreamSyncSummary > ->
144
- if (hasFailed) {
145
- LOGGER .info { " Cleaning up destination started for ${writeConfigs.size} streams" }
146
- for (writeConfig in writeConfigs) {
147
- storageOperations.cleanUpBucketObject(
148
- writeConfig.fullOutputPath,
149
- writeConfig.storedFiles
150
- )
151
- writeConfig.clearStoredFiles()
144
+
145
+ val streamDescriptorToWriteConfig =
146
+ writeConfigs.associateBy {
147
+ StreamDescriptor ().withNamespace(it.namespace).withName(it.streamName)
148
+ }
149
+ return OnCloseFunction {
150
+ _: Boolean ,
151
+ streamSyncSummaries: Map <StreamDescriptor , StreamSyncSummary > ->
152
+ // On stream success clean up the objects marked for deletion per stream. This is done
153
+ // on per-stream basis
154
+ streamSyncSummaries.forEach { (streamDescriptor, streamSummary) ->
155
+ val streamSuccessful =
156
+ streamSummary.terminalStatus ==
157
+ AirbyteStreamStatusTraceMessage .AirbyteStreamStatus .COMPLETE
158
+ if (streamSuccessful) {
159
+ val writeConfig = streamDescriptorToWriteConfig[streamDescriptor]!!
160
+ if (writeConfig.objectsFromOldGeneration.isNotEmpty()) {
161
+ // Although S3API is safe to send empty list of keys, just avoiding
162
+ // unnecessary S3 call
163
+ // Logic to determine what to delete is in onStart so not doing any
164
+ // redundant checks for
165
+ // destinationSyncMode.
166
+ LOGGER .info {
167
+ " Found ${writeConfig.objectsFromOldGeneration.size} marked for deletion in namespace: ${streamDescriptor.namespace} ,stream: ${streamDescriptor.name} " +
168
+ " Proceeding with cleaning up the objects"
169
+ }
170
+ storageOperations.cleanUpObjects(writeConfig.objectsFromOldGeneration)
171
+ LOGGER .info {
172
+ " Cleaning up completed for namespace: ${streamDescriptor.namespace} ,stream: ${streamDescriptor.name} "
173
+ }
174
+ }
175
+ } else {
176
+ LOGGER .info {
177
+ " Stream not successful with status ${streamSummary.terminalStatus} for namespace: ${streamDescriptor.namespace} , name: ${streamDescriptor.name} " +
178
+ " Skipping deletion of any old objects marked for deletion."
179
+ }
152
180
}
153
- LOGGER .info { " Cleaning up destination completed." }
154
181
}
155
182
}
156
183
}
@@ -225,56 +252,74 @@ class S3ConsumerFactory {
225
252
)
226
253
}
227
254
228
- private fun mustCleanUpExistingObjects (
255
+ private fun isAppendSync (writeConfig : WriteConfig ): Boolean {
256
+ // This is an additional safety check, that this really is OVERWRITE
257
+ // mode, this avoids bad things happening like deleting all objects
258
+ // in APPEND mode.
259
+ return writeConfig.minimumGenerationId == 0L &&
260
+ writeConfig.syncMode != DestinationSyncMode .OVERWRITE
261
+ }
262
+
263
+ private fun keysForOverwriteDeletion (
229
264
writeConfig : WriteConfig ,
230
265
storageOperations : BlobStorageOperations
231
- ): Boolean {
232
- return when (writeConfig.minimumGenerationId) {
233
- // This is an additional safety check, that this really is OVERWRITE
234
- // mode, this avoids bad things happening like deleting all objects
235
- // in APPEND mode.
236
- 0L -> writeConfig.syncMode == DestinationSyncMode .OVERWRITE
237
- writeConfig.generationId -> {
238
- // This is truncate sync and try to determine if the current generation
239
- // data is already present
240
- val namespace = writeConfig.namespace
241
- val stream = writeConfig.streamName
242
- val outputBucketPath = writeConfig.outputBucketPath
243
- val pathFormat = writeConfig.pathFormat
244
- // generationId is missing, assume the last sync was ran in non-resumeable refresh
245
- // mode,
246
- // cleanup files
247
- val currentGenerationId =
248
- storageOperations.getStageGeneration(
249
- namespace,
250
- stream,
251
- outputBucketPath,
252
- pathFormat
253
- )
254
- if (currentGenerationId == null ) {
255
- LOGGER .info {
256
- " Missing generationId from the lastModified object, proceeding with cleanup for stream ${writeConfig.streamName} "
257
- }
258
- return true
266
+ ): List <String > {
267
+ // Guards to fail fast
268
+ if (writeConfig.minimumGenerationId == 0L ) {
269
+ throw IllegalArgumentException (
270
+ " Keys should not be marked for deletion when not in OVERWRITE mode"
271
+ )
272
+ }
273
+ if (writeConfig.minimumGenerationId != writeConfig.generationId) {
274
+ throw IllegalArgumentException (" Hybrid refreshes are not yet supported." )
275
+ }
276
+
277
+ // This is truncate sync and try to determine if the current generation
278
+ // data is already present
279
+ val namespace = writeConfig.namespace
280
+ val stream = writeConfig.streamName
281
+ val outputBucketPath = writeConfig.outputBucketPath
282
+ val pathFormat = writeConfig.pathFormat
283
+ // generationId is missing, assume the last sync was ran in non-resumeable refresh
284
+ // mode,
285
+ // cleanup files
286
+ val currentGenerationId =
287
+ storageOperations.getStageGeneration(namespace, stream, outputBucketPath, pathFormat)
288
+ var filterByCurrentGen = false
289
+ if (currentGenerationId != null ) {
290
+ // if minGen = gen = retrievedGen and skip clean up
291
+ val hasDataFromCurrentGeneration = currentGenerationId == writeConfig.generationId
292
+ if (hasDataFromCurrentGeneration) {
293
+ LOGGER .info {
294
+ " Preserving data from previous sync for stream ${writeConfig.streamName} since it matches the current generation ${writeConfig.generationId} "
259
295
}
260
- // if minGen = gen = retrievedGen and skip clean up
261
- val hasDataFromCurrentGeneration = currentGenerationId == writeConfig.generationId
262
- if (hasDataFromCurrentGeneration) {
263
- LOGGER .info {
264
- " Preserving data from previous sync for stream ${writeConfig.streamName} since it matches the current generation ${writeConfig.generationId} "
265
- }
266
- } else {
267
- LOGGER .info {
268
- " No data exists from previous sync for stream ${writeConfig.streamName} from current generation ${writeConfig.generationId} , " +
269
- " proceeding to clean up existing data"
270
- }
296
+ // There could be data dangling from T-2 sync if current generation failed in T-1
297
+ // sync.
298
+ filterByCurrentGen = true
299
+ } else {
300
+ LOGGER .info {
301
+ " No data exists from previous sync for stream ${writeConfig.streamName} from current generation ${writeConfig.generationId} , " +
302
+ " proceeding to clean up existing data"
271
303
}
272
- return ! hasDataFromCurrentGeneration
273
304
}
274
- else -> {
275
- throw IllegalArgumentException (" Hybrid refreshes are not yet supported." )
305
+ } else {
306
+ LOGGER .info {
307
+ " Missing generationId from the lastModified object, proceeding with cleanup for stream ${writeConfig.streamName} "
276
308
}
277
309
}
310
+
311
+ return storageOperations.listExistingObjects(
312
+ namespace,
313
+ stream,
314
+ outputBucketPath,
315
+ pathFormat,
316
+ currentGenerationId =
317
+ if (filterByCurrentGen) {
318
+ writeConfig.generationId
319
+ } else {
320
+ null
321
+ },
322
+ )
278
323
}
279
324
280
325
companion object {
0 commit comments