Skip to content

Commit 6f61517

Browse files
committed
bigquery cdk bump
1 parent 8c07cb8 commit 6f61517

File tree

6 files changed

+20
-16
lines changed

6 files changed

+20
-16
lines changed

airbyte-integrations/connectors/destination-bigquery/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ airbyteJavaConnector {
1111
'gcs-destinations',
1212
'core',
1313
]
14-
useLocalCdk = false
14+
useLocalCdk = true
1515
}
1616

1717
java {

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import com.google.common.annotations.VisibleForTesting
88
import io.airbyte.integrations.base.destination.typing_deduping.*
99
import io.airbyte.integrations.base.destination.typing_deduping.Array
1010
import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer
11-
import io.airbyte.protocol.models.v0.DestinationSyncMode
1211
import java.time.Instant
1312
import java.util.*
1413
import java.util.function.Function
@@ -296,7 +295,7 @@ class BigQuerySqlGenerator
296295
useExpensiveSaferCasting: Boolean
297296
): Sql {
298297
val handleNewRecords =
299-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
298+
if (stream.postImportAction == ImportType.DEDUPE) {
300299
upsertNewRecords(stream, finalSuffix, useExpensiveSaferCasting, minRawTimestamp)
301300
} else {
302301
insertNewRecords(stream, finalSuffix, useExpensiveSaferCasting, minRawTimestamp)
@@ -587,7 +586,7 @@ class BigQuerySqlGenerator
587586
.collect(Collectors.joining("\n"))
588587
val extractedAtCondition = buildExtractedAtCondition(minRawTimestamp)
589588

590-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
589+
if (stream.postImportAction == ImportType.DEDUPE) {
591590
// When deduping, we need to dedup the raw records. Note the row_number() invocation in
592591
// the SQL
593592
// statement. Do the same extract+cast CTE + airbyte_meta construction as in non-dedup
@@ -929,7 +928,7 @@ class BigQuerySqlGenerator
929928

930929
fun clusteringColumns(stream: StreamConfig): List<String> {
931930
val clusterColumns: MutableList<String> = ArrayList()
932-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
931+
if (stream.postImportAction == ImportType.DEDUPE) {
933932
// We're doing de-duping, therefore we have a primary key.
934933
// Cluster on the first 3 PK columns since BigQuery only allows up to 4 clustering
935934
// columns,

airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperationTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM
1717
import io.airbyte.commons.json.Jsons
1818
import io.airbyte.commons.string.Strings
1919
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
20+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
2021
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2122
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
2223
import io.airbyte.integrations.destination.bigquery.BigQueryConsts
@@ -27,7 +28,6 @@ import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlG
2728
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGeneratorIntegrationTest
2829
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
2930
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
30-
import io.airbyte.protocol.models.v0.DestinationSyncMode
3131
import java.nio.file.Files
3232
import java.nio.file.Path
3333
import java.util.Optional
@@ -62,7 +62,7 @@ class BigQueryDirectLoadingStorageOperationTest {
6262
private val streamConfig =
6363
StreamConfig(
6464
streamId,
65-
DestinationSyncMode.APPEND,
65+
ImportType.APPEND,
6666
emptyList(),
6767
Optional.empty(),
6868
LinkedHashMap(),

airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.*
1616
import io.airbyte.integrations.destination.bigquery.BigQueryConsts
1717
import io.airbyte.integrations.destination.bigquery.BigQueryDestination.Companion.getBigQuery
1818
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
19-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2019
import java.nio.file.Files
2120
import java.nio.file.Path
2221
import java.time.Duration
@@ -509,7 +508,7 @@ class BigQuerySqlGeneratorIntegrationTest :
509508
val stream =
510509
StreamConfig(
511510
streamId,
512-
DestinationSyncMode.APPEND,
511+
ImportType.APPEND,
513512
emptyList(),
514513
Optional.empty(),
515514
columns,

airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class BigQuerySqlGeneratorTest {
3333
Assertions.assertEquals(
3434
StreamConfig(
3535
StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),
36-
DestinationSyncMode.APPEND,
36+
ImportType.APPEND,
3737
emptyList(),
3838
Optional.empty(),
3939
columns,

airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,20 @@ import com.google.cloud.bigquery.StandardSQLTypeName
88
import com.google.cloud.bigquery.StandardTableDefinition
99
import com.google.cloud.bigquery.TimePartitioning
1010
import com.google.common.collect.ImmutableList
11-
import io.airbyte.integrations.base.destination.typing_deduping.*
11+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
12+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
1213
import io.airbyte.integrations.base.destination.typing_deduping.Array
14+
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
15+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
16+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
17+
import io.airbyte.integrations.base.destination.typing_deduping.Struct
18+
import io.airbyte.integrations.base.destination.typing_deduping.Union
19+
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
1320
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler.Companion.clusteringMatches
1421
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler.Companion.partitioningMatches
1522
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler.Companion.schemaContainAllFinalTableV2AirbyteColumns
1623
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.Companion.toDialectType
17-
import io.airbyte.protocol.models.v0.DestinationSyncMode
18-
import java.util.*
24+
import java.util.Optional
1925
import java.util.stream.Collectors
2026
import java.util.stream.Stream
2127
import org.junit.jupiter.api.Assertions
@@ -52,7 +58,7 @@ class BigqueryDestinationHandlerTest {
5258
var stream =
5359
StreamConfig(
5460
Mockito.mock(),
55-
DestinationSyncMode.APPEND_DEDUP,
61+
ImportType.DEDUPE,
5662
listOf(ColumnId("foo", "bar", "fizz")),
5763
Optional.empty(),
5864
LinkedHashMap(),
@@ -75,7 +81,7 @@ class BigqueryDestinationHandlerTest {
7581
stream =
7682
StreamConfig(
7783
Mockito.mock(),
78-
DestinationSyncMode.OVERWRITE,
84+
ImportType.APPEND,
7985
emptyList(),
8086
Optional.empty(),
8187
LinkedHashMap(),
@@ -103,7 +109,7 @@ class BigqueryDestinationHandlerTest {
103109
stream =
104110
StreamConfig(
105111
Mockito.mock(),
106-
DestinationSyncMode.APPEND_DEDUP,
112+
ImportType.DEDUPE,
107113
Stream.concat(expectedStreamColumnNames.stream(), Stream.of("d", "e"))
108114
.map { name: String -> ColumnId(name, "foo", "bar") }
109115
.collect(Collectors.toList()),

0 commit comments

Comments
 (0)