Skip to content

Commit e3cc022

Browse files
authored
Destinations CDK: Correctly detect when real raw/final table is correct generation during truncate sync (#42503)
1 parent a7863a0 commit e3cc022

File tree

5 files changed

+364
-24
lines changed

5 files changed

+364
-24
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.44.14 | 2024-08-19 | [\#42503](https://github.com/airbytehq/airbyte/pull/42503) | Destinations (refreshes) - correctly detect existing raw/final table of the correct generation during truncate sync |
177178
| 0.44.13 | 2024-08-14 | [\#42579](https://github.com/airbytehq/airbyte/pull/42579) | S3 destination - OVERWRITE: keep files until successful sync of same generationId |
178179
| 0.44.5 | 2024-08-09 | [\#43374](https://github.com/airbytehq/airbyte/pull/43374) | S3 destination V2 fields, conversion improvements, bugfixes |
179180
| 0.44.4 | 2024-08-08 | [\#43410](https://github.com/airbytehq/airbyte/pull/43330) | Better logs for counting info to state message. |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.44.13
1+
version=0.44.14

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt

+106-14
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
5454
}
5555

5656
if (isTruncateSync) {
57-
prepareStageForTruncate(destinationInitialStatus, stream)
58-
rawTableSuffix = TMP_TABLE_SUFFIX
59-
initialRawTableStatus = destinationInitialStatus.initialTempRawTableStatus
57+
val (rawTableStatus, suffix) = prepareStageForTruncate(destinationInitialStatus, stream)
58+
initialRawTableStatus = rawTableStatus
59+
rawTableSuffix = suffix
6060
} else {
6161
rawTableSuffix = NO_SUFFIX
6262
initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus)
@@ -132,7 +132,17 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
132132
private fun prepareStageForTruncate(
133133
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
134134
stream: StreamConfig
135-
) {
135+
): Pair<InitialRawTableStatus, String> {
136+
/*
137+
tl;dr:
138+
* if a temp raw table exists, check whether it belongs to the correct generation.
139+
* if wrong generation, truncate it.
140+
* regardless, write into the temp raw table.
141+
* else, if a real raw table exists, check its generation.
142+
* if wrong generation, write into a new temp raw table.
143+
* else, write into the preexisting real raw table.
144+
* else, create a new temp raw table and write into it.
145+
*/
136146
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
137147
val tempStageGeneration =
138148
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
@@ -146,6 +156,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
146156
stream.id,
147157
TMP_TABLE_SUFFIX,
148158
)
159+
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
149160
} else {
150161
log.info {
151162
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and existing temp raw table belongs to generation $tempStageGeneration (!= current generation ${stream.generationId}). Truncating it."
@@ -156,18 +167,67 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
156167
TMP_TABLE_SUFFIX,
157168
replace = true,
158169
)
170+
// We nuked the temp raw table, so create a new initial raw table status.
171+
return Pair(
172+
InitialRawTableStatus(
173+
rawTableExists = true,
174+
hasUnprocessedRecords = false,
175+
maxProcessedTimestamp = Optional.empty(),
176+
),
177+
TMP_TABLE_SUFFIX,
178+
)
179+
}
180+
} else if (destinationInitialStatus.initialRawTableStatus.rawTableExists) {
181+
// It's possible to "resume" a truncate sync that was previously already finalized.
182+
// In this case, there is no existing temp raw table, and there is a real raw table
183+
// which already belongs to the correct generation.
184+
// Check for that case now.
185+
val realStageGeneration = storageOperation.getStageGeneration(stream.id, NO_SUFFIX)
186+
if (realStageGeneration == null || realStageGeneration == stream.generationId) {
187+
log.info {
188+
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, no existing temp raw table, and existing real raw table belongs to generation $realStageGeneration (== current generation ${stream.generationId}). Retaining it."
189+
}
190+
// The real raw table is from the correct generation. Set up any other resources
191+
// (staging file, etc.), but leave the table untouched.
192+
storageOperation.prepareStage(stream.id, NO_SUFFIX)
193+
return Pair(destinationInitialStatus.initialRawTableStatus, NO_SUFFIX)
194+
} else {
195+
log.info {
196+
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, existing real raw table belongs to generation $realStageGeneration (!= current generation ${stream.generationId}), and no preexisting temp raw table. Creating a temp raw table."
197+
}
198+
// We're initiating a new truncate refresh. Create a new temp stage.
199+
storageOperation.prepareStage(
200+
stream.id,
201+
TMP_TABLE_SUFFIX,
202+
)
203+
return Pair(
204+
// Create a fresh raw table status, since we created a fresh temp stage.
205+
InitialRawTableStatus(
206+
rawTableExists = true,
207+
hasUnprocessedRecords = false,
208+
maxProcessedTimestamp = Optional.empty(),
209+
),
210+
TMP_TABLE_SUFFIX,
211+
)
159212
}
160-
// (if the existing temp stage is from the correct generation, then we're resuming
161-
// a truncate refresh, and should keep the previous temp stage).
162213
} else {
163214
log.info {
164-
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp raw table. Creating it."
215+
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp or raw table. Creating a temp raw table."
165216
}
166217
// We're initiating a new truncate refresh. Create a new temp stage.
167218
storageOperation.prepareStage(
168219
stream.id,
169220
TMP_TABLE_SUFFIX,
170221
)
222+
return Pair(
223+
// Create a fresh raw table status, since we created a fresh temp stage.
224+
InitialRawTableStatus(
225+
rawTableExists = true,
226+
hasUnprocessedRecords = false,
227+
maxProcessedTimestamp = Optional.empty(),
228+
),
229+
TMP_TABLE_SUFFIX,
230+
)
171231
}
172232
}
173233

@@ -188,8 +248,39 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
188248
// The table already exists. Decide whether we're writing to it directly, or
189249
// using a tmp table.
190250
if (isTruncateSync) {
191-
// Truncate refresh. Use a temp final table.
192-
return prepareFinalTableForOverwrite(initialStatus)
251+
if (initialStatus.isFinalTableEmpty || initialStatus.finalTableGenerationId == null) {
252+
if (!initialStatus.isSchemaMismatch) {
253+
log.info {
254+
"Truncate sync, and final table is empty and has correct schema. Writing to it directly."
255+
}
256+
return NO_SUFFIX
257+
} else {
258+
// No point soft resetting an empty table. We'll just do an overwrite later.
259+
log.info {
260+
"Truncate sync, and final table is empty, but has the wrong schema. Using a temp final table."
261+
}
262+
return prepareFinalTableForOverwrite(initialStatus)
263+
}
264+
} else if (
265+
initialStatus.finalTableGenerationId >=
266+
initialStatus.streamConfig.minimumGenerationId
267+
) {
268+
if (!initialStatus.isSchemaMismatch) {
269+
log.info {
270+
"Truncate sync, and final table matches our generation and has correct schema. Writing to it directly."
271+
}
272+
return NO_SUFFIX
273+
} else {
274+
log.info {
275+
"Truncate sync, and final table matches our generation, but has the wrong schema. Writing to it directly, but triggering a soft reset first."
276+
}
277+
storageOperation.softResetFinalTable(stream)
278+
return NO_SUFFIX
279+
}
280+
} else {
281+
// The final table is in the wrong generation. Use a temp final table.
282+
return prepareFinalTableForOverwrite(initialStatus)
283+
}
193284
} else {
194285
if (initialStatus.isSchemaMismatch || initialStatus.destinationState.needsSoftReset()) {
195286
// We're loading data directly into the existing table.
@@ -257,14 +348,14 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
257348
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName
258349
// + suffix))`
259350
// but annoying and confusing.
260-
if (isTruncateSync && streamSuccessful) {
351+
if (isTruncateSync && streamSuccessful && rawTableSuffix.isNotEmpty()) {
261352
log.info {
262-
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message."
353+
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and are using a temporary raw table."
263354
}
264355
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
265356
} else {
266357
log.info {
267-
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful"
358+
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; raw table suffix: \"$rawTableSuffix\""
268359
}
269360
}
270361

@@ -303,10 +394,11 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
303394
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} running as truncate sync. Stream success: $streamSuccessful; records written: ${syncSummary.recordsWritten}; temp raw table already existed: ${initialRawTableStatus.rawTableExists}; temp raw table had records: ${initialRawTableStatus.hasUnprocessedRecords}"
304395
}
305396
} else {
306-
// In truncate mode, we want to read all the raw records. Typically, this is equivalent
397+
// When targeting the temp final table, we want to read all the raw records
398+
// because the temp final table is always a full rebuild. Typically, this is equivalent
307399
// to filtering on timestamp, but might as well be explicit.
308400
val timestampFilter =
309-
if (!isTruncateSync) {
401+
if (finalTmpTableSuffix.isEmpty()) {
310402
initialRawTableStatus.maxProcessedTimestamp
311403
} else {
312404
Optional.empty()

0 commit comments

Comments
 (0)