Skip to content

Commit a98f521

Browse files
committed
bigquery implement refreshes logic
1 parent 53d7c23 commit a98f521

File tree

8 files changed

+293
-30
lines changed

8 files changed

+293
-30
lines changed

airbyte-integrations/connectors/destination-bigquery/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ airbyteJavaConnector {
1111
'gcs-destinations',
1212
'core',
1313
]
14-
useLocalCdk = false
14+
useLocalCdk = true
1515
}
1616

1717
java {

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
8-
dockerImageTag: 2.6.2
8+
dockerImageTag: 2.7.0
99
dockerRepository: airbyte/destination-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
1111
githubIssueLabel: destination-bigquery
@@ -31,6 +31,7 @@ data:
3131
memory_request: 1Gi
3232
supportLevel: certified
3333
supportsDbt: true
34+
supportsRefreshes: true
3435
tags:
3536
- language:java
3637
connectorTestSuitesOptions:

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
3636
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
3737
import io.airbyte.commons.exceptions.ConfigErrorException;
38+
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation;
3839
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
3940
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
4041
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
@@ -85,15 +86,15 @@ public boolean isFinalTableEmpty(final StreamId id) {
8586
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.getFinalNamespace(), id.getFinalName())).getNumRows());
8687
}
8788

88-
public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
89-
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName()));
89+
public InitialRawTableStatus getInitialRawTableState(final StreamId id, final String suffix) throws Exception {
90+
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName() + suffix));
9091
if (rawTable == null) {
9192
// Table doesn't exist. There are no unprocessed records, and no timestamp.
9293
return new InitialRawTableStatus(false, false, Optional.empty());
9394
}
9495

9596
final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
96-
"raw_table", id.rawTableId(QUOTE))).replace(
97+
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
9798
// bigquery timestamps have microsecond precision
9899
"""
99100
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
@@ -109,7 +110,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
109110
}
110111

111112
final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
112-
"raw_table", id.rawTableId(QUOTE))).replace(
113+
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
113114
"""
114115
SELECT MAX(_airbyte_extracted_at)
115116
FROM ${raw_table}
@@ -199,11 +200,13 @@ public List<DestinationInitialStatus<BigQueryDestinationState>> gatherInitialSta
199200
for (final StreamConfig streamConfig : streamConfigs) {
200201
final StreamId id = streamConfig.getId();
201202
final Optional<TableDefinition> finalTable = findExistingTable(id);
202-
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
203+
final InitialRawTableStatus rawTableState = getInitialRawTableState(id, "");
204+
final InitialRawTableStatus tempRawTableState = getInitialRawTableState(id, AbstractStreamOperation.TMP_TABLE_SUFFIX);
203205
initialStates.add(new DestinationInitialStatus<>(
204206
streamConfig,
205207
finalTable.isPresent(),
206208
rawTableState,
209+
tempRawTableState,
207210
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
208211
finalTable.isEmpty() || isFinalTableEmpty(id),
209212
// Return a default state blob since we don't actually track state.

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class BigQueryDirectLoadingStorageOperation(
5959
override fun writeToStage(streamConfig: StreamConfig, data: Stream<PartialAirbyteMessage>) {
6060
// TODO: why do we need ratelimiter, and using unstable API from Google's guava
6161
rateLimiter.acquire()
62-
val tableId = TableId.of(streamConfig.id.rawNamespace, streamConfig.id.rawName)
62+
val tableId = tableId(streamConfig.id)
6363
log.info { "Writing data to table $tableId with schema $SCHEMA_V2" }
6464
val writeChannel = initWriteChannel(tableId)
6565
writeChannel.use {

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import com.google.cloud.bigquery.FormatOptions
1010
import com.google.cloud.bigquery.Job
1111
import com.google.cloud.bigquery.JobInfo
1212
import com.google.cloud.bigquery.LoadJobConfiguration
13-
import com.google.cloud.bigquery.TableId
1413
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
1514
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer
1615
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations
@@ -21,7 +20,6 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
2120
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
2221
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
2322
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
24-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2523
import io.github.oshai.kotlinlogging.KotlinLogging
2624
import java.util.*
2725
import org.joda.time.DateTime
@@ -46,9 +44,10 @@ class BigQueryGcsStorageOperation(
4644
) {
4745
private val connectionId = UUID.randomUUID()
4846
private val syncDateTime = DateTime.now(DateTimeZone.UTC)
49-
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
50-
super.prepareStage(streamId, destinationSyncMode)
47+
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
48+
super.prepareStage(streamId, suffix, replace)
5149
// prepare staging bucket
50+
// TODO should this also use the suffix?
5251
log.info { "Creating bucket ${gcsConfig.bucketName}" }
5352
gcsStorageOperations.createBucketIfNotExists()
5453
}
@@ -75,7 +74,7 @@ class BigQueryGcsStorageOperation(
7574
}
7675

7776
private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) {
78-
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
77+
val tableId = tableId(streamId)
7978
val stagingPath = stagingFullPath(streamId)
8079
val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName"
8180
log.info { "Uploading records from file $fullFilePath to target Table $tableId" }

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
package io.airbyte.integrations.destination.bigquery.operation
66

77
import com.google.cloud.bigquery.BigQuery
8+
import com.google.cloud.bigquery.QueryJobConfiguration
89
import com.google.cloud.bigquery.TableId
10+
import com.google.cloud.bigquery.TableResult
911
import io.airbyte.integrations.base.destination.operation.StorageOperation
1012
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
1113
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
@@ -14,10 +16,9 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
1416
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
1517
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
1618
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
17-
import io.airbyte.protocol.models.v0.DestinationSyncMode
1819
import io.github.oshai.kotlinlogging.KotlinLogging
1920
import java.time.Instant
20-
import java.util.*
21+
import java.util.Optional
2122
import java.util.concurrent.ConcurrentHashMap
2223

2324
private val log = KotlinLogging.logger {}
@@ -29,39 +30,83 @@ abstract class BigQueryStorageOperation<Data>(
2930
protected val datasetLocation: String
3031
) : StorageOperation<Data> {
3132
private val existingSchemas = ConcurrentHashMap.newKeySet<String>()
32-
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
33+
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
3334
// Prepare staging table. For overwrite, it does drop-create so we can skip explicit create.
34-
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
35-
truncateStagingTable(streamId)
35+
if (replace) {
36+
truncateStagingTable(streamId, suffix)
3637
} else {
37-
createStagingTable(streamId)
38+
createStagingTable(streamId, suffix)
3839
}
3940
}
4041

41-
private fun createStagingTable(streamId: StreamId) {
42-
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
42+
override fun overwriteStage(streamId: StreamId, suffix: String) {
43+
bigquery.delete(tableId(streamId, ""))
44+
bigquery.query(
45+
QueryJobConfiguration.of(
46+
"""ALTER TABLE `${streamId.rawNamespace}`.`${streamId.rawName}$suffix` RENAME TO `${streamId.rawName}`"""
47+
),
48+
)
49+
}
50+
51+
override fun transferFromTempStage(streamId: StreamId, suffix: String) {
52+
// TODO figure out how to make this work
53+
// something about incompatible partitioning spec (probably b/c we're copying from a temp
54+
// table partitioned on generation ID into an old real raw table partitioned on
55+
// extracted_at)
56+
val tempRawTable = tableId(streamId, suffix)
57+
// val jobConf =
58+
// CopyJobConfiguration.newBuilder(tableId(streamId, ""), tempRawTable)
59+
// .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
60+
// .build()
61+
// val job = bigquery.create(JobInfo.of(jobConf))
62+
// BigQueryUtils.waitForJobFinish(job)
63+
64+
bigquery.query(
65+
QueryJobConfiguration.of(
66+
"""
67+
INSERT INTO `${streamId.rawNamespace}`.`${streamId.rawName}`
68+
SELECT * FROM `${streamId.rawNamespace}`.`${streamId.rawName}$suffix`
69+
""".trimIndent()
70+
)
71+
)
72+
bigquery.delete(tempRawTable)
73+
}
74+
75+
override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
76+
val result: TableResult =
77+
bigquery.query(
78+
QueryJobConfiguration.of(
79+
"SELECT _airbyte_generation_id FROM ${streamId.rawNamespace}.${streamId.rawName}$suffix LIMIT 1"
80+
),
81+
)
82+
if (result.totalRows == 0L) {
83+
return null
84+
}
85+
return result.iterateAll().first()["_airbyte_generation_id"].longValue
86+
}
87+
88+
private fun createStagingTable(streamId: StreamId, suffix: String) {
4389
BigQueryUtils.createPartitionedTableIfNotExists(
4490
bigquery,
45-
tableId,
46-
BigQueryRecordFormatter.SCHEMA_V2
91+
tableId(streamId, suffix),
92+
BigQueryRecordFormatter.SCHEMA_V2,
4793
)
4894
}
4995

50-
private fun dropStagingTable(streamId: StreamId) {
51-
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
52-
bigquery.delete(tableId)
96+
private fun dropStagingTable(streamId: StreamId, suffix: String) {
97+
bigquery.delete(tableId(streamId, suffix))
5398
}
5499

55100
/**
56101
* "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where
57102
* the table's partition filter must be turned off to truncate. Since deleting a table is a free
58103
* operation this option re-uses functions that already exist
59104
*/
60-
private fun truncateStagingTable(streamId: StreamId) {
105+
private fun truncateStagingTable(streamId: StreamId, suffix: String) {
61106
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
62107
log.info { "Truncating raw table $tableId" }
63-
dropStagingTable(streamId)
64-
createStagingTable(streamId)
108+
dropStagingTable(streamId, suffix)
109+
createStagingTable(streamId, suffix)
65110
}
66111

67112
override fun cleanupStage(streamId: StreamId) {
@@ -91,7 +136,7 @@ abstract class BigQueryStorageOperation<Data>(
91136
}"
92137
}
93138
destinationHandler.execute(
94-
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix)
139+
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix),
95140
)
96141
}
97142
}
@@ -109,4 +154,9 @@ abstract class BigQueryStorageOperation<Data>(
109154
finalTableSuffix,
110155
)
111156
}
157+
158+
companion object {
159+
fun tableId(streamId: StreamId, suffix: String = ""): TableId =
160+
TableId.of(streamId.rawNamespace, streamId.rawName + suffix)
161+
}
112162
}

0 commit comments

Comments
 (0)