Skip to content

Commit b20365a

Browse files
authored
Destination bigquery: pull in cdk update for refreshes bugfix (#42504)
1 parent 9eccb24 commit b20365a

File tree

14 files changed

+152
-29
lines changed

14 files changed

+152
-29
lines changed

airbyte-integrations/connectors/destination-bigquery/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.41.4'
6+
cdkVersionRequired = '0.44.14'
77
features = [
88
'db-destinations',
99
'datastore-bigquery',

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
8-
dockerImageTag: 2.8.6
8+
dockerImageTag: 2.8.7
99
dockerRepository: airbyte/destination-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
1111
githubIssueLabel: destination-bigquery

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
3838
import io.airbyte.integrations.base.destination.operation.StandardStreamOperation
3939
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
4040
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
41+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
4142
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
4243
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
4344
import io.airbyte.integrations.base.destination.typing_deduping.Sql
@@ -58,7 +59,6 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
5859
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
5960
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
6061
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
61-
import io.airbyte.protocol.models.v0.DestinationSyncMode
6262
import io.github.oshai.kotlinlogging.KotlinLogging
6363
import java.io.ByteArrayInputStream
6464
import java.io.IOException
@@ -174,12 +174,12 @@ class BigQueryDestination : BaseConnector(), Destination {
174174
val streamConfig =
175175
StreamConfig(
176176
id = streamId,
177-
destinationSyncMode = DestinationSyncMode.OVERWRITE,
177+
postImportAction = ImportType.APPEND,
178178
primaryKey = listOf(),
179179
cursor = Optional.empty(),
180180
columns = linkedMapOf(),
181-
generationId = 0,
182-
minimumGenerationId = 0,
181+
generationId = 1,
182+
minimumGenerationId = 1,
183183
syncId = 0
184184
)
185185

@@ -206,7 +206,9 @@ class BigQueryDestination : BaseConnector(), Destination {
206206
),
207207
isSchemaMismatch = true,
208208
isFinalTableEmpty = true,
209-
destinationState = BigQueryDestinationState(needsSoftReset = false)
209+
destinationState = BigQueryDestinationState(needsSoftReset = false),
210+
finalTempTableGenerationId = null,
211+
finalTableGenerationId = null,
210212
)
211213

212214
// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt

+10-3
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,27 @@ class BigQueryGcsStorageOperation(
6565
suffix: String,
6666
data: SerializableBuffer
6767
) {
68-
val stagedFileName: String = uploadRecordsToStage(streamConfig.id, suffix, data)
68+
val stagedFileName: String =
69+
uploadRecordsToStage(streamConfig.id, suffix, data, streamConfig.generationId)
6970
copyIntoTableFromStage(streamConfig.id, suffix, stagedFileName)
7071
}
7172

7273
private fun uploadRecordsToStage(
7374
streamId: StreamId,
7475
suffix: String,
75-
buffer: SerializableBuffer
76+
buffer: SerializableBuffer,
77+
generationId: Long,
7678
): String {
7779
val objectPath: String = stagingFullPath(streamId)
7880
log.info {
7981
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName}$suffix to path $objectPath"
8082
}
81-
return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath)
83+
return gcsStorageOperations.uploadRecordsToBucket(
84+
buffer,
85+
streamId.rawNamespace,
86+
objectPath,
87+
generationId
88+
)
8289
}
8390

8491
private fun copyIntoTableFromStage(streamId: StreamId, suffix: String, stagedFileName: String) {

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.google.cloud.bigquery.BigQuery
88
import com.google.cloud.bigquery.QueryJobConfiguration
99
import com.google.cloud.bigquery.TableId
1010
import com.google.cloud.bigquery.TableResult
11+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
1112
import io.airbyte.integrations.base.destination.operation.StorageOperation
1213
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
1314
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
@@ -90,7 +91,12 @@ abstract class BigQueryStorageOperation<Data>(
9091
if (result.totalRows == 0L) {
9192
return null
9293
}
93-
return result.iterateAll().first()["_airbyte_generation_id"].longValue
94+
val value = result.iterateAll().first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
95+
return if (value == null || value.isNull) {
96+
0
97+
} else {
98+
value.longValue
99+
}
94100
}
95101

96102
private fun createStagingTable(streamId: StreamId, suffix: String) {

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.kt

+43-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants
1111
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
1212
import io.airbyte.commons.exceptions.ConfigErrorException
1313
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
14+
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
1415
import io.airbyte.integrations.base.destination.typing_deduping.*
1516
import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase
1617
import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase
@@ -60,7 +61,7 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
6061
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
6162
FROM ${'$'}{raw_table}
6263
WHERE _airbyte_loaded_at IS NULL
63-
64+
6465
""".trimIndent()
6566
)
6667
)
@@ -95,7 +96,7 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
9596
"""
9697
SELECT MAX(_airbyte_extracted_at)
9798
FROM ${'$'}{raw_table}
98-
99+
99100
""".trimIndent()
100101
)
101102
)
@@ -118,6 +119,43 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
118119
}
119120
}
120121

122+
private fun getFinalTableGeneration(id: StreamId, suffix: String): Long? {
123+
val finalTable = bq.getTable(TableId.of(id.finalNamespace, id.finalName + suffix))
124+
if (finalTable == null || !finalTable.exists()) {
125+
return null
126+
}
127+
128+
val tableDef = finalTable.getDefinition<StandardTableDefinition>()
129+
val hasGenerationId: Boolean =
130+
tableDef.schema
131+
?.fields
132+
// Field doesn't have a hasColumn(String) method >.>
133+
?.any { it.name == JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID }
134+
?: false
135+
if (!hasGenerationId) {
136+
return null
137+
}
138+
139+
val result =
140+
bq.query(
141+
QueryJobConfiguration.of(
142+
"""
143+
SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}
144+
FROM ${id.finalTableId(BigQuerySqlGenerator.QUOTE, suffix)}
145+
""".trimIndent()
146+
)
147+
)
148+
if (result.totalRows == 0L) {
149+
return null
150+
}
151+
val value = result.iterateAll().first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
152+
return if (value == null || value.isNull) {
153+
0
154+
} else {
155+
value.longValue
156+
}
157+
}
158+
121159
@Throws(InterruptedException::class)
122160
override fun execute(sql: Sql) {
123161
val transactions = sql.asSqlStrings("BEGIN TRANSACTION", "COMMIT TRANSACTION")
@@ -234,7 +272,9 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
234272
isFinalTableEmpty(
235273
id
236274
), // Return a default state blob since we don't actually track state.
237-
BigQueryDestinationState(false)
275+
BigQueryDestinationState(false),
276+
finalTableGenerationId = getFinalTableGeneration(id, ""),
277+
finalTempTableGenerationId = getFinalTableGeneration(id, TMP_TABLE_SUFFIX)
238278
)
239279
)
240280
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import com.google.common.annotations.VisibleForTesting
88
import io.airbyte.integrations.base.destination.typing_deduping.*
99
import io.airbyte.integrations.base.destination.typing_deduping.Array
1010
import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer
11-
import io.airbyte.protocol.models.v0.DestinationSyncMode
1211
import java.time.Instant
1312
import java.util.*
1413
import java.util.function.Function
@@ -296,7 +295,7 @@ class BigQuerySqlGenerator
296295
useExpensiveSaferCasting: Boolean
297296
): Sql {
298297
val handleNewRecords =
299-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
298+
if (stream.postImportAction == ImportType.DEDUPE) {
300299
upsertNewRecords(stream, finalSuffix, useExpensiveSaferCasting, minRawTimestamp)
301300
} else {
302301
insertNewRecords(stream, finalSuffix, useExpensiveSaferCasting, minRawTimestamp)
@@ -587,7 +586,7 @@ class BigQuerySqlGenerator
587586
.collect(Collectors.joining("\n"))
588587
val extractedAtCondition = buildExtractedAtCondition(minRawTimestamp)
589588

590-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
589+
if (stream.postImportAction == ImportType.DEDUPE) {
591590
// When deduping, we need to dedup the raw records. Note the row_number() invocation in
592591
// the SQL
593592
// statement. Do the same extract+cast CTE + airbyte_meta construction as in non-dedup
@@ -929,7 +928,7 @@ class BigQuerySqlGenerator
929928

930929
fun clusteringColumns(stream: StreamConfig): List<String> {
931930
val clusterColumns: MutableList<String> = ArrayList()
932-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
931+
if (stream.postImportAction == ImportType.DEDUPE) {
933932
// We're doing de-duping, therefore we have a primary key.
934933
// Cluster on the first 3 PK columns since BigQuery only allows up to 4 clustering
935934
// columns,

airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperationTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM
1717
import io.airbyte.commons.json.Jsons
1818
import io.airbyte.commons.string.Strings
1919
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
20+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
2021
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2122
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
2223
import io.airbyte.integrations.destination.bigquery.BigQueryConsts
@@ -27,7 +28,6 @@ import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlG
2728
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGeneratorIntegrationTest
2829
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
2930
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
30-
import io.airbyte.protocol.models.v0.DestinationSyncMode
3131
import java.nio.file.Files
3232
import java.nio.file.Path
3333
import java.util.Optional
@@ -62,7 +62,7 @@ class BigQueryDirectLoadingStorageOperationTest {
6262
private val streamConfig =
6363
StreamConfig(
6464
streamId,
65-
DestinationSyncMode.APPEND,
65+
ImportType.APPEND,
6666
emptyList(),
6767
Optional.empty(),
6868
LinkedHashMap(),

airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.kt

+59
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,65 @@ abstract class AbstractBigQueryTypingDedupingTest : BaseTypingDedupingTest() {
290290
}
291291
}
292292

293+
@Test
294+
fun testGenerationIdMigrationForOverwrite() {
295+
// First sync
296+
val catalog1 =
297+
ConfiguredAirbyteCatalog()
298+
.withStreams(
299+
listOf(
300+
ConfiguredAirbyteStream()
301+
.withSyncMode(SyncMode.FULL_REFRESH)
302+
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
303+
.withSyncId(41L)
304+
.withGenerationId(42L)
305+
.withMinimumGenerationId(0L)
306+
.withStream(
307+
AirbyteStream()
308+
.withNamespace(streamNamespace)
309+
.withName(streamName)
310+
.withJsonSchema(SCHEMA),
311+
),
312+
),
313+
)
314+
val messages1 = readMessages("dat/sync1_messages.jsonl")
315+
runSync(
316+
catalog1,
317+
messages1,
318+
"airbyte/destination-bigquery:2.4.20",
319+
// Old connector version can't handle TRACE messages; disable the
320+
// stream status message
321+
streamStatus = null,
322+
)
323+
324+
// Second sync
325+
val catalog2 =
326+
ConfiguredAirbyteCatalog()
327+
.withStreams(
328+
listOf(
329+
ConfiguredAirbyteStream()
330+
.withSyncMode(SyncMode.FULL_REFRESH)
331+
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
332+
.withSyncId(42L)
333+
.withGenerationId(43L)
334+
.withMinimumGenerationId(43L)
335+
.withStream(
336+
AirbyteStream()
337+
.withNamespace(streamNamespace)
338+
.withName(streamName)
339+
.withJsonSchema(SCHEMA),
340+
),
341+
),
342+
)
343+
val messages2 = readMessages("dat/sync2_messages.jsonl")
344+
runSync(catalog2, messages2)
345+
346+
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl")
347+
val expectedFinalRecords2 =
348+
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
349+
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
350+
}
351+
293352
protected open val rawDataset: String
294353
/**
295354
* Subclasses using a config with a nonstandard raw table dataset should override this

airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.*
1616
import io.airbyte.integrations.destination.bigquery.BigQueryConsts
1717
import io.airbyte.integrations.destination.bigquery.BigQueryDestination.Companion.getBigQuery
1818
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
19-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2019
import java.nio.file.Files
2120
import java.nio.file.Path
2221
import java.time.Duration
@@ -509,7 +508,7 @@ class BigQuerySqlGeneratorIntegrationTest :
509508
val stream =
510509
StreamConfig(
511510
streamId,
512-
DestinationSyncMode.APPEND,
511+
ImportType.APPEND,
513512
emptyList(),
514513
Optional.empty(),
515514
columns,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Only sync2 messages present in overwrite mode
2+
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
3+
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
4+
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}

airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class BigQuerySqlGeneratorTest {
3333
Assertions.assertEquals(
3434
StreamConfig(
3535
StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),
36-
DestinationSyncMode.APPEND,
36+
ImportType.APPEND,
3737
emptyList(),
3838
Optional.empty(),
3939
columns,

0 commit comments

Comments
 (0)