Skip to content

Commit b9bf10d

Browse files
committed
derp
1 parent 6f61517 commit b9bf10d

File tree

2 files changed

+147
-14
lines changed

2 files changed

+147
-14
lines changed

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
5555
}
5656

5757
if (isTruncateSync) {
58-
prepareStageForTruncate(destinationInitialStatus, stream)
59-
rawTableSuffix = TMP_TABLE_SUFFIX
60-
initialRawTableStatus = destinationInitialStatus.initialTempRawTableStatus
58+
val (rawTableStatus, suffix) = prepareStageForTruncate(destinationInitialStatus, stream)
59+
initialRawTableStatus = rawTableStatus
60+
rawTableSuffix = suffix
6161
} else {
6262
rawTableSuffix = NO_SUFFIX
6363
initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus)
@@ -133,7 +133,17 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
133133
private fun prepareStageForTruncate(
134134
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
135135
stream: StreamConfig
136-
) {
136+
): Pair<InitialRawTableStatus, String> {
137+
/*
138+
tl;dr:
139+
* if a temp raw table exists, check whether it belongs to the correct generation.
140+
* if wrong generation, truncate it.
141+
* regardless, write into the temp raw table.
142+
* else, if a real raw table exists, check its generation.
143+
* if wrong generation, write into a new temp raw table.
144+
* else, write into the preexisting real raw table.
145+
* else, create a new temp raw table and write into it.
146+
*/
137147
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
138148
val tempStageGeneration =
139149
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
@@ -160,15 +170,42 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
160170
}
161171
// (if the existing temp stage is from the correct generation, then we're resuming
162172
// a truncate refresh, and should keep the previous temp stage).
173+
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
174+
} else if (destinationInitialStatus.initialRawTableStatus.rawTableExists) {
175+
// It's possible to "resume" a truncate sync that was previously already finalized.
176+
// In this case, there is no existing temp raw table, and there is a real raw table
177+
// which already belongs to the correct generation.
178+
// Check for that case now.
179+
val realStageGeneration = storageOperation.getStageGeneration(stream.id, NO_SUFFIX)
180+
if (realStageGeneration == null || realStageGeneration == stream.generationId) {
181+
log.info {
182+
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, no existing temp raw table, and existing real raw table belongs to generation $realStageGeneration (== current generation ${stream.generationId}). Retaining it."
183+
}
184+
// The real raw table is from the correct generation. Set up any other resources
185+
// (staging file, etc.), but leave the table untouched.
186+
storageOperation.prepareStage(stream.id, NO_SUFFIX)
187+
return Pair(destinationInitialStatus.initialRawTableStatus, NO_SUFFIX)
188+
} else {
189+
log.info {
190+
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, existing real raw table belongs to generation $realStageGeneration (!= current generation ${stream.generationId}), and no preexisting temp raw table. Creating a temp raw table."
191+
}
192+
// We're initiating a new truncate refresh. Create a new temp stage.
193+
storageOperation.prepareStage(
194+
stream.id,
195+
TMP_TABLE_SUFFIX,
196+
)
197+
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
198+
}
163199
} else {
164200
log.info {
165-
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp raw table. Creating it."
201+
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp or raw table. Creating a temp raw table."
166202
}
167203
// We're initiating a new truncate refresh. Create a new temp stage.
168204
storageOperation.prepareStage(
169205
stream.id,
170206
TMP_TABLE_SUFFIX,
171207
)
208+
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
172209
}
173210
}
174211

@@ -258,14 +295,14 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
258295
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName
259296
// + suffix))`
260297
// but annoying and confusing.
261-
if (isTruncateSync && streamSuccessful) {
298+
if (isTruncateSync && streamSuccessful && rawTableSuffix.isNotEmpty()) {
262299
log.info {
263-
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message."
300+
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and are using a temporary raw table."
264301
}
265302
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
266303
} else {
267304
log.info {
268-
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful"
305+
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; raw table suffix: \"$rawTableSuffix\""
269306
}
270307
}
271308

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class AbstractStreamOperationTest {
7878
val initialState =
7979
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
8080
every { streamConfig } returns this@Truncate.streamConfig
81-
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
81+
every { initialRawTableStatus.rawTableExists } returns false
8282
every { initialTempRawTableStatus.rawTableExists } returns false
8383
every { isFinalTablePresent } returns false
8484
every {
@@ -122,7 +122,7 @@ class AbstractStreamOperationTest {
122122
val initialState =
123123
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
124124
every { streamConfig } returns this@Truncate.streamConfig
125-
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
125+
every { initialRawTableStatus.rawTableExists } returns true
126126
every { initialTempRawTableStatus.rawTableExists } returns false
127127
every { isFinalTablePresent } returns true
128128
every { isFinalTableEmpty } returns true
@@ -134,10 +134,12 @@ class AbstractStreamOperationTest {
134134
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
135135
} returns destinationState
136136
}
137+
every { storageOperation.getStageGeneration(streamId, "") } returns -1
137138

138139
val streamOperations = TestStreamOperation(storageOperation, initialState)
139140

140141
verifySequence {
142+
storageOperation.getStageGeneration(streamId, "")
141143
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
142144
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
143145
}
@@ -172,7 +174,7 @@ class AbstractStreamOperationTest {
172174
val initialState =
173175
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
174176
every { streamConfig } returns this@Truncate.streamConfig
175-
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
177+
every { initialRawTableStatus.rawTableExists } returns true
176178
every { initialTempRawTableStatus.rawTableExists } returns false
177179
every { isFinalTablePresent } returns true
178180
every { isFinalTableEmpty } returns true
@@ -181,10 +183,12 @@ class AbstractStreamOperationTest {
181183
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
182184
} returns destinationState
183185
}
186+
every { storageOperation.getStageGeneration(streamId, "") } returns -1
184187

185188
val streamOperations = TestStreamOperation(storageOperation, initialState)
186189

187190
verifySequence {
191+
storageOperation.getStageGeneration(streamId, "")
188192
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
189193
// No table creation - we can just reuse the existing table.
190194
}
@@ -218,18 +222,20 @@ class AbstractStreamOperationTest {
218222
val initialState =
219223
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
220224
every { streamConfig } returns this@Truncate.streamConfig
221-
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
225+
every { initialRawTableStatus.rawTableExists } returns true
222226
every { initialTempRawTableStatus.rawTableExists } returns false
223227
every { isFinalTablePresent } returns true
224228
every { isFinalTableEmpty } returns false
225229
every {
226230
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
227231
} returns destinationState
228232
}
233+
every { storageOperation.getStageGeneration(streamId, "") } returns -1
229234

230235
val streamOperations = TestStreamOperation(storageOperation, initialState)
231236

232237
verifySequence {
238+
storageOperation.getStageGeneration(streamId, "")
233239
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
234240
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
235241
}
@@ -264,14 +270,15 @@ class AbstractStreamOperationTest {
264270
val initialState =
265271
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
266272
every { streamConfig } returns this@Truncate.streamConfig
267-
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
273+
every { initialRawTableStatus.rawTableExists } returns true
268274
every { initialTempRawTableStatus.rawTableExists } returns false
269275
every { isFinalTablePresent } returns true
270276
every { isFinalTableEmpty } returns false
271277
every {
272278
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
273279
} returns destinationState
274280
}
281+
every { storageOperation.getStageGeneration(streamId, "") } returns -1
275282

276283
val streamOperations = TestStreamOperation(storageOperation, initialState)
277284
// No point in verifying setup, completely identical to existingNonEmptyTable
@@ -298,18 +305,20 @@ class AbstractStreamOperationTest {
298305
val initialState =
299306
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
300307
every { streamConfig } returns this@Truncate.streamConfig
301-
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
308+
every { initialRawTableStatus.rawTableExists } returns true
302309
every { initialTempRawTableStatus.rawTableExists } returns false
303310
every { isFinalTablePresent } returns true
304311
every { isFinalTableEmpty } returns false
305312
every {
306313
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
307314
} returns destinationState
308315
}
316+
every { storageOperation.getStageGeneration(streamId, "") } returns -1
309317

310318
val streamOperations = TestStreamOperation(storageOperation, initialState)
311319

312320
verifySequence {
321+
storageOperation.getStageGeneration(streamId, "")
313322
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
314323
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
315324
}
@@ -481,6 +490,93 @@ class AbstractStreamOperationTest {
481490
confirmVerified(storageOperation)
482491
checkUnnecessaryStub(initialState, initialState.destinationState)
483492
}
493+
494+
@ParameterizedTest
495+
@MethodSource(
496+
"io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#generationIds"
497+
)
498+
fun existingRealRawTableMatchingGeneration(existingRealTableGeneration: Long?) {
499+
val initialState =
500+
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
501+
every { streamConfig } returns this@Truncate.streamConfig
502+
every { initialRawTableStatus.rawTableExists } returns true
503+
every { initialTempRawTableStatus.rawTableExists } returns false
504+
every { isFinalTablePresent } returns false
505+
every {
506+
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
507+
} returns destinationState
508+
}
509+
every { storageOperation.getStageGeneration(streamId, "") } returns
510+
existingRealTableGeneration
511+
512+
val streamOperations = TestStreamOperation(storageOperation, initialState)
513+
514+
verifySequence {
515+
storageOperation.getStageGeneration(streamId, "")
516+
storageOperation.prepareStage(streamId, "")
517+
storageOperation.createFinalTable(streamConfig, "", false)
518+
}
519+
confirmVerified(storageOperation)
520+
521+
clearMocks(storageOperation)
522+
streamOperations.finalizeTable(
523+
streamConfig,
524+
StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE)
525+
)
526+
527+
verifySequence {
528+
storageOperation.cleanupStage(streamId)
529+
storageOperation.typeAndDedupe(
530+
streamConfig,
531+
Optional.empty(),
532+
"",
533+
)
534+
}
535+
confirmVerified(storageOperation)
536+
checkUnnecessaryStub(initialState, initialState.destinationState)
537+
}
538+
539+
@Test
540+
fun existingRealRawTableWrongGeneration() {
541+
val initialState =
542+
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
543+
every { streamConfig } returns this@Truncate.streamConfig
544+
every { initialRawTableStatus.rawTableExists } returns true
545+
every { initialTempRawTableStatus.rawTableExists } returns false
546+
every { isFinalTablePresent } returns false
547+
every {
548+
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
549+
} returns destinationState
550+
}
551+
every { storageOperation.getStageGeneration(streamId, "") } returns -1
552+
553+
val streamOperations = TestStreamOperation(storageOperation, initialState)
554+
555+
verifySequence {
556+
storageOperation.getStageGeneration(streamId, "")
557+
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX, replace = false)
558+
storageOperation.createFinalTable(streamConfig, "", replace = false)
559+
}
560+
confirmVerified(storageOperation)
561+
562+
clearMocks(storageOperation)
563+
streamOperations.finalizeTable(
564+
streamConfig,
565+
StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE)
566+
)
567+
568+
verifySequence {
569+
storageOperation.cleanupStage(streamId)
570+
storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX)
571+
storageOperation.typeAndDedupe(
572+
streamConfig,
573+
Optional.empty(),
574+
"",
575+
)
576+
}
577+
confirmVerified(storageOperation)
578+
checkUnnecessaryStub(initialState, initialState.destinationState)
579+
}
484580
}
485581

486582
@Nested

0 commit comments

Comments
 (0)